This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 61d09518e96a05222919d22b69654f2d3eb49245
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 19 13:45:21 2024 +0800

    [enhancement](memtable) make memtable memusage more accurate (#40912)
    
    ## Proposed changes
    
    1. Add memtype to memtable, and save a weak ptr vector in memtable
    writer, so that we could get different memory usage by traverse the
    vector.
    2. Using scoped memory usage to compute the mem usage of a memtable.
    3. CHECK if the tracker is 0 when the memtable flush success.
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/common/config.cpp                |  2 --
 be/src/common/config.h                  |  2 --
 be/src/olap/memtable.cpp                | 40 +++++++++++++--------------------
 be/src/olap/memtable.h                  | 37 +++++++++++++++---------------
 be/src/olap/memtable_flush_executor.cpp | 24 +++++++++-----------
 be/src/olap/memtable_flush_executor.h   |  4 ++--
 be/src/olap/memtable_memory_limiter.cpp |  4 +++-
 be/src/olap/memtable_writer.cpp         | 40 +++++++++++++--------------------
 be/src/olap/memtable_writer.h           | 11 ++++-----
 be/src/runtime/tablets_channel.cpp      |  2 +-
 10 files changed, 70 insertions(+), 96 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 963d770276a..5a34ec1957c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -604,8 +604,6 @@ DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
 // percent of (active memtables size / all memtables size) when reach soft 
limit
 DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
 
-// memtable insert memory tracker will multiply input block size with this 
ratio
-DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ce42bd47fc1..0a467f191b1 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -654,8 +654,6 @@ DECLARE_mInt32(memtable_hard_limit_active_percent);
 // percent of (active memtables size / all memtables size) when reach soft 
limit
 DECLARE_mInt32(memtable_soft_limit_active_percent);
 
-// memtable insert memory tracker will multiply input block size with this 
ratio
-DECLARE_mDouble(memtable_insert_memory_ratio);
 // max write buffer size before flush, default 200MB
 DECLARE_mInt64(write_buffer_size);
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 878cea8fb53..3cb2594b845 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -50,20 +50,16 @@ using namespace ErrorCode;
 
 MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> 
tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
-                   bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info,
-                   const std::shared_ptr<MemTracker>& insert_mem_tracker,
-                   const std::shared_ptr<MemTracker>& flush_mem_tracker)
-        : _tablet_id(tablet_id),
+                   bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info)
+        : _mem_type(MemType::ACTIVE),
+          _tablet_id(tablet_id),
           _enable_unique_key_mow(enable_unique_key_mow),
           _keys_type(tablet_schema->keys_type()),
           _tablet_schema(tablet_schema),
-          _insert_mem_tracker(insert_mem_tracker),
-          _flush_mem_tracker(flush_mem_tracker),
           _is_first_insertion(true),
           _agg_functions(tablet_schema->num_columns()),
           _offsets_of_aggregate_states(tablet_schema->num_columns()),
-          _total_size_of_aggregate_states(0),
-          _mem_usage(0) {
+          _total_size_of_aggregate_states(0) {
     g_memtable_cnt << 1;
     _query_thread_context.init_unlocked();
     _arena = std::make_unique<vectorized::Arena>();
@@ -82,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
     }
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
+    _mem_tracker = std::make_shared<MemTracker>();
 }
 
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
@@ -142,6 +139,13 @@ void MemTable::_init_agg_functions(const 
vectorized::Block* block) {
 
 MemTable::~MemTable() {
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
+    if (_is_flush_success) {
+        // If the memtable is flush success, then its memtracker's consumption 
should be 0
+        if (_mem_tracker->consumption() != 0 && 
config::crash_in_memory_tracker_inaccurate) {
+            LOG(FATAL) << "memtable flush success but cosumption is not 0, it 
is "
+                       << _mem_tracker->consumption();
+        }
+    }
     g_memtable_input_block_allocated_size << 
-_input_mutable_block.allocated_bytes();
     g_memtable_cnt << -1;
     if (_keys_type != KeysType::DUP_KEYS) {
@@ -159,13 +163,7 @@ MemTable::~MemTable() {
         }
     }
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), 
std::default_delete<RowInBlock>());
-    _insert_mem_tracker->release(_mem_usage);
-    _flush_mem_tracker->set_consumption(0);
-    DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl
-                                                     << 
_insert_mem_tracker->log_usage();
-    DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
     _arena.reset();
-    _agg_buffer_pool.clear();
     _vec_row_comparator.reset();
     _row_in_blocks.clear();
     _agg_functions.clear();
@@ -180,6 +178,7 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
 
 Status MemTable::insert(const vectorized::Block* input_block,
                         const std::vector<uint32_t>& row_idxs) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     if (_is_first_insertion) {
         _is_first_insertion = false;
         auto clone_block = input_block->clone_without_columns(&_column_offset);
@@ -214,10 +213,6 @@ Status MemTable::insert(const vectorized::Block* 
input_block,
                                                   row_idxs.data() + num_rows, 
&_column_offset));
     auto block_size1 = _input_mutable_block.allocated_bytes();
     g_memtable_input_block_allocated_size << block_size1 - block_size0;
-    auto input_size = size_t(input_block->bytes() * num_rows / 
input_block->rows() *
-                             config::memtable_insert_memory_ratio);
-    _mem_usage += input_size;
-    _insert_mem_tracker->consume(input_size);
     for (int i = 0; i < num_rows; i++) {
         _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + 
i});
     }
@@ -467,10 +462,6 @@ void MemTable::_aggregate() {
     }
     if constexpr (!is_final) {
         // if is not final, we collect the agg results to input_block and then 
continue to insert
-        size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
-        // flush will not run here, so will not duplicate `_flush_mem_tracker`
-        _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage);
-        _mem_usage = shrunked_after_agg;
         _input_mutable_block.swap(_output_mutable_block);
         //TODO(weixang):opt here.
         std::unique_ptr<vectorized::Block> empty_input_block = 
in_block.create_same_struct_block(0);
@@ -483,6 +474,7 @@ void MemTable::_aggregate() {
 }
 
 void MemTable::shrink_memtable_by_agg() {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     if (_keys_type == KeysType::DUP_KEYS) {
         return;
     }
@@ -528,8 +520,8 @@ Status 
MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
     }
     g_memtable_input_block_allocated_size << 
-_input_mutable_block.allocated_bytes();
     _input_mutable_block.clear();
-    _insert_mem_tracker->release(_mem_usage);
-    _mem_usage = 0;
+    // After to block, all data in arena is saved in the block
+    _arena.reset();
     *res = vectorized::Block::create_unique(_output_mutable_block.to_block());
     return Status::OK();
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 70f7a9f22a0..4ae92c2d2d8 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -47,6 +47,11 @@ class TabletSchema;
 class TupleDescriptor;
 enum KeysType : int;
 
+// Active: the memtable is currently used by writer to insert into blocks
+// Write_finished: the memtable finished write blocks and in the queue waiting 
for flush
+// FLUSH: the memtable is under flushing, write segment to disk.
+enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 };
+
 // row pos in _input_mutable_block
 struct RowInBlock {
     size_t _row_pos;
@@ -171,16 +176,11 @@ class MemTable {
 public:
     MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* 
tuple_desc,
-             bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info,
-             const std::shared_ptr<MemTracker>& insert_mem_tracker,
-             const std::shared_ptr<MemTracker>& flush_mem_tracker);
+             bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info);
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet_id; }
-    size_t memory_usage() const {
-        return _insert_mem_tracker->consumption() + _arena->used_size() +
-               _flush_mem_tracker->consumption();
-    }
+    size_t memory_usage() const { return _mem_tracker->consumption(); }
     // insert tuple from (row_pos) to (row_pos+num_rows)
     Status insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
@@ -196,10 +196,16 @@ public:
 
     const MemTableStat& stat() { return _stat; }
 
-    std::shared_ptr<MemTracker> flush_mem_tracker() { return 
_flush_mem_tracker; }
-
     QueryThreadContext query_thread_context() { return _query_thread_context; }
 
+    std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
+
+    void set_flush_success() { _is_flush_success = true; }
+
+    MemType get_mem_type() { return _mem_type; }
+
+    void update_mem_type(MemType memtype) { _mem_type = memtype; }
+
 private:
     // for vectorized
     void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, 
RowInBlock* new_row,
@@ -209,9 +215,11 @@ private:
     Status _to_block(std::unique_ptr<vectorized::Block>* res);
 
 private:
+    std::atomic<MemType> _mem_type;
     int64_t _tablet_id;
     bool _enable_unique_key_mow = false;
     bool _is_partial_update = false;
+    bool _is_flush_success = false;
     const KeysType _keys_type;
     std::shared_ptr<TabletSchema> _tablet_schema;
 
@@ -219,18 +227,11 @@ private:
 
     QueryThreadContext _query_thread_context;
 
-    // `_insert_manual_mem_tracker` manually records the memory value of 
memtable insert()
-    // `_flush_hook_mem_tracker` automatically records the memory value of 
memtable flush() through mem hook.
-    // Is used to flush when _insert_manual_mem_tracker larger than 
write_buffer_size and run flush memtable
-    // when the sum of all memtable (_insert_manual_mem_tracker + 
_flush_hook_mem_tracker) exceeds the limit.
-    std::shared_ptr<MemTracker> _insert_mem_tracker;
-    std::shared_ptr<MemTracker> _flush_mem_tracker;
+    std::shared_ptr<MemTracker> _mem_tracker;
     // Only the rows will be inserted into block can allocate memory from 
_arena.
     // In this way, we can make MemTable::memory_usage() to be more accurate, 
and eventually
     // reduce the number of segment files that are generated by current load
     std::unique_ptr<vectorized::Arena> _arena;
-    // The object buffer pool for convert tuple to row
-    ObjectPool _agg_buffer_pool;
 
     void _init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
                                             const TupleDescriptor* tuple_desc);
@@ -264,8 +265,6 @@ private:
     std::vector<size_t> _offsets_of_aggregate_states;
     size_t _total_size_of_aggregate_states;
     std::vector<RowInBlock*> _row_in_blocks;
-    // Memory usage without _arena.
-    size_t _mem_usage;
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 887340eed70..dc911647be8 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -46,10 +46,10 @@ class MemtableFlushTask final : public Runnable {
     ENABLE_FACTORY_CREATOR(MemtableFlushTask);
 
 public:
-    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, 
std::unique_ptr<MemTable> memtable,
+    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, 
std::shared_ptr<MemTable> memtable,
                       int32_t segment_id, int64_t submit_task_time)
             : _flush_token(flush_token),
-              _memtable(std::move(memtable)),
+              _memtable(memtable),
               _segment_id(segment_id),
               _submit_task_time(submit_task_time) {
         g_flush_task_num << 1;
@@ -60,7 +60,7 @@ public:
     void run() override {
         auto token = _flush_token.lock();
         if (token) {
-            token->_flush_memtable(std::move(_memtable), _segment_id, 
_submit_task_time);
+            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
         } else {
             LOG(WARNING) << "flush token is deconstructed, ignore the flush 
task";
         }
@@ -68,7 +68,7 @@ public:
 
 private:
     std::weak_ptr<FlushToken> _flush_token;
-    std::unique_ptr<MemTable> _memtable;
+    std::shared_ptr<MemTable> _memtable;
     int32_t _segment_id;
     int64_t _submit_task_time;
 };
@@ -83,7 +83,7 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat) {
     return os;
 }
 
-Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
+Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
     {
         std::shared_lock rdlk(_flush_status_lock);
         DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
@@ -98,9 +98,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable> 
mem_table) {
         return Status::OK();
     }
     int64_t submit_task_time = MonotonicNanos();
-    auto task = MemtableFlushTask::create_shared(shared_from_this(), 
std::move(mem_table),
-                                                 
_rowset_writer->allocate_segment_id(),
-                                                 submit_task_time);
+    auto task = MemtableFlushTask::create_shared(
+            shared_from_this(), mem_table, 
_rowset_writer->allocate_segment_id(), submit_task_time);
     Status ret = _thread_pool->submit(std::move(task));
     if (ret.ok()) {
         // _wait_running_task_finish was executed after this function, so no 
need to notify _cond here
@@ -136,20 +135,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << 
memtable->tablet_id()
                   << ", memsize: " << memtable->memory_usage()
                   << ", rows: " << memtable->stat().raw_rows;
+    memtable->update_mem_type(MemType::FLUSH);
     int64_t duration_ns;
     SCOPED_RAW_TIMER(&duration_ns);
     SCOPED_ATTACH_TASK(memtable->query_thread_context());
     signal::set_signal_task_id(_rowset_writer->load_id());
     signal::tablet_id = memtable->tablet_id();
     {
+        SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
         std::unique_ptr<vectorized::Block> block;
-        // During to block method, it will release old memory and create new 
block, so that
-        // we could not scoped it.
         RETURN_IF_ERROR(memtable->to_block(&block));
-        memtable->flush_mem_tracker()->consume(block->allocated_bytes());
-        SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
         RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), 
segment_id, flush_size));
     }
+    memtable->set_flush_success();
     _memtable_stat += memtable->stat();
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
@@ -158,7 +156,7 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     return Status::OK();
 }
 
-void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, 
int32_t segment_id,
+void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, 
int32_t segment_id,
                                  int64_t submit_task_time) {
     Defer defer {[&]() {
         std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 2d20298f800..25c5a37afba 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -61,7 +61,7 @@ class FlushToken : public 
std::enable_shared_from_this<FlushToken> {
 public:
     FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), 
_thread_pool(thread_pool) {}
 
-    Status submit(std::unique_ptr<MemTable> mem_table);
+    Status submit(std::shared_ptr<MemTable> mem_table);
 
     // error has happens, so we cancel this token
     // And remove all tasks in the queue.
@@ -87,7 +87,7 @@ private:
 private:
     friend class MemtableFlushTask;
 
-    void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t 
segment_id,
+    void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t 
segment_id,
                          int64_t submit_task_time);
 
     Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* 
flush_size);
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index ea045b1e53e..9b9ce19f895 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -20,6 +20,7 @@
 #include <bvar/bvar.h>
 
 #include "common/config.h"
+#include "olap/memtable.h"
 #include "olap/memtable_writer.h"
 #include "util/doris_metrics.h"
 #include "util/mem_info.h"
@@ -237,13 +238,14 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
     _active_writers.clear();
     for (auto it = _writers.begin(); it != _writers.end();) {
         if (auto writer = it->lock()) {
+            // The memtable is currently used by writer to insert blocks.
             auto active_usage = writer->active_memtable_mem_consumption();
             _active_mem_usage += active_usage;
             if (active_usage > 0) {
                 _active_writers.push_back(writer);
             }
             _flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
-            _write_mem_usage += writer->mem_consumption(MemType::WRITE);
+            _write_mem_usage += 
writer->mem_consumption(MemType::WRITE_FINISHED);
             ++it;
         } else {
             *it = std::move(_writers.back());
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 59916d5f1cc..e8123c48ecc 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -133,12 +133,18 @@ Status MemTableWriter::write(const vectorized::Block* 
block,
 
 Status MemTableWriter::_flush_memtable_async() {
     DCHECK(_flush_token != nullptr);
-    std::unique_ptr<MemTable> memtable;
+    std::shared_ptr<MemTable> memtable;
     {
         std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
-        memtable = std::move(_mem_table);
+        memtable = _mem_table;
+        _mem_table = nullptr;
     }
-    return _flush_token->submit(std::move(memtable));
+    {
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        memtable->update_mem_type(MemType::WRITE_FINISHED);
+        _freezed_mem_tables.push_back(memtable);
+    }
+    return _flush_token->submit(memtable);
 }
 
 Status MemTableWriter::flush_async() {
@@ -187,22 +193,10 @@ Status MemTableWriter::wait_flush() {
 }
 
 void MemTableWriter::_reset_mem_table() {
-    auto mem_table_insert_tracker = std::make_shared<MemTracker>(fmt::format(
-            "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
-            std::to_string(tablet_id()), _mem_table_num, 
UniqueId(_req.load_id).to_string()));
-    auto mem_table_flush_tracker = std::make_shared<MemTracker>(fmt::format(
-            "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", 
std::to_string(tablet_id()),
-            _mem_table_num++, UniqueId(_req.load_id).to_string()));
-    {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
-        _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
-    }
     {
         std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
         _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, 
_req.slots, _req.tuple_desc,
-                                      _unique_key_mow, 
_partial_update_info.get(),
-                                      mem_table_insert_tracker, 
mem_table_flush_tracker));
+                                      _unique_key_mow, 
_partial_update_info.get()));
     }
 
     _segment_num++;
@@ -353,15 +347,11 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
     }
     int64_t mem_usage = 0;
     {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
-            for (const auto& mem_table_tracker : _mem_table_insert_trackers) {
-                mem_usage += mem_table_tracker->consumption();
-            }
-        }
-        if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
-            for (const auto& mem_table_tracker : _mem_table_flush_trackers) {
-                mem_usage += mem_table_tracker->consumption();
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        for (const auto& mem_table : _freezed_mem_tables) {
+            auto mem_table_sptr = mem_table.lock();
+            if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == 
mem) {
+                mem_usage += mem_table_sptr->memory_usage();
             }
         }
     }
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ee7c8e1538a..ec44348b4a9 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -57,8 +57,6 @@ namespace vectorized {
 class Block;
 } // namespace vectorized
 
-enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
-
 // Writer for a particular (load, index, tablet).
 // This class is NOT thread-safe, external synchronization is required.
 class MemTableWriter {
@@ -123,18 +121,17 @@ private:
     Status _cancel_status;
     WriteRequest _req;
     std::shared_ptr<RowsetWriter> _rowset_writer;
-    std::unique_ptr<MemTable> _mem_table;
+    std::shared_ptr<MemTable> _mem_table;
     TabletSchemaSPtr _tablet_schema;
     bool _unique_key_mow = false;
 
     // This variable is accessed from writer thread and token flush thread
     // use a shared ptr to avoid use after free problem.
     std::shared_ptr<FlushToken> _flush_token;
-    std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
-    std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
-    SpinLock _mem_table_tracker_lock;
+    // Save the not active memtable that is in flush queue or under flushing.
+    std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables;
+    // The lock to protect _memtable and _freezed_mem_tables structure to 
avoid concurrency modification or read
     SpinLock _mem_table_ptr_lock;
-    std::atomic<uint32_t> _mem_table_num = 1;
     QueryThreadContext _query_thread_context;
 
     std::mutex _lock;
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 329366766f8..4d458cd440f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -446,7 +446,7 @@ void BaseTabletsChannel::refresh_profile() {
     {
         std::lock_guard<SpinLock> l(_tablet_writers_lock);
         for (auto&& [tablet_id, writer] : _tablet_writers) {
-            int64_t write_mem = writer->mem_consumption(MemType::WRITE);
+            int64_t write_mem = 
writer->mem_consumption(MemType::WRITE_FINISHED);
             write_mem_usage += write_mem;
             int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
             flush_mem_usage += flush_mem;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to