This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cf1efc997 [refactor](load) use smart pointers to manage writers in 
memtable memory limiter (#23019)
6cf1efc997 is described below

commit 6cf1efc997ea959f6b137f51dd646e53a4d1825c
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Aug 16 16:34:57 2023 +0800

    [refactor](load) use smart pointers to manage writers in memtable memory 
limiter (#23019)
---
 be/src/olap/delta_writer.cpp                  | 22 +++----
 be/src/olap/delta_writer.h                    |  4 +-
 be/src/olap/memtable_memory_limiter.cpp       | 56 ++++++++++--------
 be/src/olap/memtable_memory_limiter.h         |  8 +--
 be/src/olap/memtable_writer.cpp               | 83 ++++++++++++++-------------
 be/src/olap/memtable_writer.h                 | 34 +++++------
 be/src/runtime/load_channel.h                 |  9 ---
 be/src/runtime/load_channel_mgr.cpp           |  4 --
 be/src/runtime/load_channel_mgr.h             |  7 ---
 be/src/runtime/tablets_channel.cpp            | 14 +----
 be/src/runtime/tablets_channel.h              |  5 +-
 be/test/olap/memtable_memory_limiter_test.cpp |  4 --
 12 files changed, 110 insertions(+), 140 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 9c5b4b1464..dc74b5c2dd 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -66,7 +66,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* 
storage_engine, Runti
                          const UniqueId& load_id)
         : _req(*req),
           _rowset_builder(*req, storage_engine, profile),
-          _memtable_writer(*req, profile),
+          _memtable_writer(new MemTableWriter(*req)),
           _storage_engine(storage_engine) {
     _init_profile(profile);
 }
@@ -83,10 +83,10 @@ DeltaWriter::~DeltaWriter() {
     }
 
     // cancel and wait all memtables in flush queue to be finished
-    _memtable_writer.cancel();
+    _memtable_writer->cancel();
 
     if (_rowset_builder.tablet() != nullptr) {
-        const FlushStatistic& stat = _memtable_writer.get_flush_token_stats();
+        const FlushStatistic& stat = _memtable_writer->get_flush_token_stats();
         
_rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes);
         
_rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count);
     }
@@ -94,8 +94,8 @@ DeltaWriter::~DeltaWriter() {
 
 Status DeltaWriter::init() {
     _rowset_builder.init();
-    _memtable_writer.init(_rowset_builder.rowset_writer(), 
_rowset_builder.tablet_schema(),
-                          
_rowset_builder.tablet()->enable_unique_key_merge_on_write());
+    _memtable_writer->init(_rowset_builder.rowset_writer(), 
_rowset_builder.tablet_schema(),
+                           
_rowset_builder.tablet()->enable_unique_key_merge_on_write());
     _is_init = true;
     return Status::OK();
 }
@@ -115,10 +115,10 @@ Status DeltaWriter::write(const vectorized::Block* block, 
const std::vector<int>
     if (!_is_init && !_is_cancelled) {
         RETURN_IF_ERROR(init());
     }
-    return _memtable_writer.write(block, row_idxs, is_append);
+    return _memtable_writer->write(block, row_idxs, is_append);
 }
 Status DeltaWriter::wait_flush() {
-    return _memtable_writer.wait_flush();
+    return _memtable_writer->wait_flush();
 }
 
 Status DeltaWriter::close() {
@@ -133,7 +133,7 @@ Status DeltaWriter::close() {
         // for this tablet when being closed.
         RETURN_IF_ERROR(init());
     }
-    return _memtable_writer.close();
+    return _memtable_writer->close();
 }
 
 Status DeltaWriter::build_rowset() {
@@ -142,7 +142,7 @@ Status DeltaWriter::build_rowset() {
             << "delta writer is supposed be to initialized before 
build_rowset() being called";
 
     SCOPED_TIMER(_close_wait_timer);
-    RETURN_IF_ERROR(_memtable_writer.close_wait());
+    RETURN_IF_ERROR(_memtable_writer->close_wait(_profile));
     return _rowset_builder.build_rowset();
 }
 
@@ -193,13 +193,13 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
     if (_is_cancelled) {
         return Status::OK();
     }
-    RETURN_IF_ERROR(_memtable_writer.cancel_with_status(st));
+    RETURN_IF_ERROR(_memtable_writer->cancel_with_status(st));
     _is_cancelled = true;
     return Status::OK();
 }
 
 int64_t DeltaWriter::mem_consumption(MemType mem) {
-    return _memtable_writer.mem_consumption(mem);
+    return _memtable_writer->mem_consumption(mem);
 }
 
 void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 4c9f0fc35a..764b23d2aa 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -112,7 +112,7 @@ public:
     // For UT
     DeleteBitmapPtr get_delete_bitmap() { return 
_rowset_builder.get_delete_bitmap(); }
 
-    MemTableWriter* memtable_writer() { return &_memtable_writer; }
+    std::shared_ptr<MemTableWriter> memtable_writer() { return 
_memtable_writer; }
 
 private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
@@ -126,7 +126,7 @@ private:
     bool _is_cancelled = false;
     WriteRequest _req;
     RowsetBuilder _rowset_builder;
-    MemTableWriter _memtable_writer;
+    std::shared_ptr<MemTableWriter> _memtable_writer;
 
     StorageEngine* _storage_engine;
 
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 54d124946e..6c3981696c 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -42,13 +42,6 @@ MemTableMemoryLimiter::MemTableMemoryLimiter() {}
 
 MemTableMemoryLimiter::~MemTableMemoryLimiter() {
     DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption);
-    for (auto writer : _writers) {
-        if (writer != nullptr) {
-            delete writer;
-            writer = nullptr;
-        }
-    }
-    _writers.clear();
 }
 
 Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
@@ -61,14 +54,9 @@ Status MemTableMemoryLimiter::init(int64_t 
process_mem_limit) {
     return Status::OK();
 }
 
-void MemTableMemoryLimiter::register_writer(MemTableWriter* writer) {
+void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> 
writer) {
     std::lock_guard<std::mutex> l(_lock);
-    _writers.insert(writer);
-}
-
-void MemTableMemoryLimiter::deregister_writer(MemTableWriter* writer) {
-    std::lock_guard<std::mutex> l(_lock);
-    _writers.erase(writer);
+    _writers.push_back(writer);
 }
 
 void MemTableMemoryLimiter::handle_memtable_flush() {
@@ -115,15 +103,25 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
         };
         std::priority_queue<WriterMemItem, std::vector<WriterMemItem>, 
decltype(cmp)> mem_heap(cmp);
 
-        for (auto& writer : _writers) {
-            int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
-            mem_heap.emplace(writer, active_memtable_mem);
+        for (auto it = _writers.begin(); it != _writers.end();) {
+            if (auto writer = it->lock()) {
+                int64_t active_memtable_mem = 
writer->active_memtable_mem_consumption();
+                mem_heap.emplace(writer, active_memtable_mem);
+                ++it;
+            } else {
+                *it = std::move(_writers.back());
+                _writers.pop_back();
+            }
         }
         int64_t mem_to_flushed = _mem_tracker->consumption() / 10;
         int64_t mem_consumption_in_picked_writer = 0;
         while (!mem_heap.empty()) {
             WriterMemItem mem_item = mem_heap.top();
-            auto writer = mem_item.writer;
+            mem_heap.pop();
+            auto writer = mem_item.writer.lock();
+            if (!writer) {
+                continue;
+            }
             int64_t mem_size = mem_item.mem_size;
             writers_to_reduce_mem.emplace_back(writer, mem_size);
             st = writer->flush_memtable_and_wait(false);
@@ -139,7 +137,6 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
             if (mem_consumption_in_picked_writer > mem_to_flushed) {
                 break;
             }
-            mem_heap.pop();
         }
         if (writers_to_reduce_mem.empty()) {
             // should not happen, add log to observe
@@ -184,14 +181,18 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
     for (auto item : writers_to_reduce_mem) {
         VLOG_NOTICE << "reducing memory, wait flush mem_size: "
                     << PrettyPrinter::print_bytes(item.mem_size);
-        st = item.writer->wait_flush();
+        auto writer = item.writer.lock();
+        if (!writer) {
+            continue;
+        }
+        st = writer->wait_flush();
         if (!st.ok()) {
             auto err_msg = fmt::format(
                     "tablet writer failed to reduce mem consumption by 
flushing memtable, "
                     "tablet_id={}, err={}",
-                    item.writer->tablet_id(), st.to_string());
+                    writer->tablet_id(), st.to_string());
             LOG(WARNING) << err_msg;
-            item.writer->cancel_with_status(st);
+            writer->cancel_with_status(st);
         }
     }
 
@@ -213,9 +214,16 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
 
 void MemTableMemoryLimiter::_refresh_mem_tracker_without_lock() {
     _mem_usage = 0;
-    for (auto& writer : _writers) {
-        _mem_usage += writer->mem_consumption(MemType::ALL);
+    for (auto it = _writers.begin(); it != _writers.end();) {
+        if (auto writer = it->lock()) {
+            _mem_usage += writer->mem_consumption(MemType::ALL);
+            ++it;
+        } else {
+            *it = std::move(_writers.back());
+            _writers.pop_back();
+        }
     }
+    VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
     THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), 
_mem_tracker.get());
 }
 
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 92b60e569b..ea66ce62e0 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -26,7 +26,7 @@
 namespace doris {
 class MemTableWriter;
 struct WriterMemItem {
-    MemTableWriter* writer;
+    std::weak_ptr<MemTableWriter> writer;
     int64_t mem_size;
 };
 class MemTableMemoryLimiter {
@@ -40,9 +40,7 @@ public:
     // If yes, it will flush memtable to try to reduce memory consumption.
     void handle_memtable_flush();
 
-    void register_writer(MemTableWriter* writer);
-
-    void deregister_writer(MemTableWriter* writer);
+    void register_writer(std::weak_ptr<MemTableWriter> writer);
 
     void refresh_mem_tracker() {
         std::lock_guard<std::mutex> l(_lock);
@@ -66,6 +64,6 @@ private:
     int64_t _load_soft_mem_limit = -1;
     bool _soft_reduce_mem_in_progress = false;
 
-    std::unordered_set<MemTableWriter*> _writers;
+    std::vector<std::weak_ptr<MemTableWriter>> _writers;
 };
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 8caddb6d37..5434e918e3 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -49,27 +49,7 @@
 namespace doris {
 using namespace ErrorCode;
 
-MemTableWriter::MemTableWriter(const WriteRequest& req, RuntimeProfile* 
profile) : _req(req) {
-    _init_profile(profile);
-}
-
-void MemTableWriter::_init_profile(RuntimeProfile* profile) {
-    _profile = profile->create_child(fmt::format("MemTableWriter {}", 
_req.tablet_id), true, true);
-    _lock_timer = ADD_TIMER(_profile, "LockTime");
-    _sort_timer = ADD_TIMER(_profile, "MemTableSortTime");
-    _agg_timer = ADD_TIMER(_profile, "MemTableAggTime");
-    _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime");
-    _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
-    _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
-    _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
-    _delete_bitmap_timer = ADD_TIMER(_profile, "DeleteBitmapTime");
-    _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
-    _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
-    _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
-    _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
-    _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT);
-    _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT);
-}
+MemTableWriter::MemTableWriter(const WriteRequest& req) : _req(req) {}
 
 MemTableWriter::~MemTableWriter() {
     if (!_is_init) {
@@ -175,7 +155,7 @@ Status MemTableWriter::flush_memtable_and_wait(bool 
need_wait) {
 
     if (need_wait) {
         // wait all memtables in flush queue to be flushed.
-        SCOPED_TIMER(_wait_flush_timer);
+        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
         RETURN_IF_ERROR(_flush_token->wait());
     }
     return Status::OK();
@@ -193,7 +173,7 @@ Status MemTableWriter::wait_flush() {
             return _cancel_status;
         }
     }
-    SCOPED_TIMER(_wait_flush_timer);
+    SCOPED_RAW_TIMER(&_wait_flush_time_ns);
     RETURN_IF_ERROR(_flush_token->wait());
     return Status::OK();
 }
@@ -227,7 +207,7 @@ void MemTableWriter::_reset_mem_table() {
                                   _unique_key_mow, mem_table_insert_tracker,
                                   mem_table_flush_tracker));
 
-    COUNTER_UPDATE(_segment_num, 1);
+    _segment_num++;
 }
 
 Status MemTableWriter::close() {
@@ -256,8 +236,8 @@ Status MemTableWriter::close() {
     }
 }
 
-Status MemTableWriter::close_wait() {
-    SCOPED_TIMER(_close_wait_timer);
+Status MemTableWriter::_do_close_wait() {
+    SCOPED_RAW_TIMER(&_close_wait_time_ns);
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() 
being called";
@@ -269,7 +249,7 @@ Status MemTableWriter::close_wait() {
     Status st;
     // return error if previous flush failed
     {
-        SCOPED_TIMER(_wait_flush_timer);
+        SCOPED_RAW_TIMER(&_wait_flush_time_ns);
         st = _flush_token->wait();
     }
     if (UNLIKELY(!st.ok())) {
@@ -296,21 +276,46 @@ Status MemTableWriter::close_wait() {
                   << _wait_flush_timer->elapsed_time() << "(ns), stats: " << 
stat;
     }*/
 
-    COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
-    COUNTER_SET(_delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
-    COUNTER_SET(_segment_writer_timer, _rowset_writer->segment_writer_ns());
-    const auto& memtable_stat = _flush_token->memtable_stat();
-    COUNTER_SET(_sort_timer, memtable_stat.sort_ns);
-    COUNTER_SET(_agg_timer, memtable_stat.agg_ns);
-    COUNTER_SET(_memtable_duration_timer, memtable_stat.duration_ns);
-    COUNTER_SET(_put_into_output_timer, memtable_stat.put_into_output_ns);
-    COUNTER_SET(_sort_times, memtable_stat.sort_times);
-    COUNTER_SET(_agg_times, memtable_stat.agg_times);
-    COUNTER_SET(_raw_rows_num, memtable_stat.raw_rows);
-    COUNTER_SET(_merged_rows_num, memtable_stat.merged_rows);
     return Status::OK();
 }
 
+void MemTableWriter::_update_profile(RuntimeProfile* profile) {
+    // NOTE: MemTableWriter may be accessed when profile is out of scope, in 
MemTableMemoryLimiter.
+    // To avoid accessing dangling pointers, we cannot make profile as a 
member of MemTableWriter.
+    auto child =
+            profile->create_child(fmt::format("MemTableWriter {}", 
_req.tablet_id), true, true);
+    auto lock_timer = ADD_TIMER(child, "LockTime");
+    auto sort_timer = ADD_TIMER(child, "MemTableSortTime");
+    auto agg_timer = ADD_TIMER(child, "MemTableAggTime");
+    auto memtable_duration_timer = ADD_TIMER(child, "MemTableDurationTime");
+    auto segment_writer_timer = ADD_TIMER(child, "SegmentWriterTime");
+    auto wait_flush_timer = ADD_TIMER(child, "MemTableWaitFlushTime");
+    auto put_into_output_timer = ADD_TIMER(child, "MemTablePutIntoOutputTime");
+    auto delete_bitmap_timer = ADD_TIMER(child, "DeleteBitmapTime");
+    auto close_wait_timer = ADD_TIMER(child, "CloseWaitTime");
+    auto sort_times = ADD_COUNTER(child, "MemTableSortTimes", TUnit::UNIT);
+    auto agg_times = ADD_COUNTER(child, "MemTableAggTimes", TUnit::UNIT);
+    auto segment_num = ADD_COUNTER(child, "SegmentNum", TUnit::UNIT);
+    auto raw_rows_num = ADD_COUNTER(child, "RawRowNum", TUnit::UNIT);
+    auto merged_rows_num = ADD_COUNTER(child, "MergedRowNum", TUnit::UNIT);
+
+    COUNTER_UPDATE(lock_timer, _lock_watch.elapsed_time());
+    COUNTER_SET(delete_bitmap_timer, _rowset_writer->delete_bitmap_ns());
+    COUNTER_SET(segment_writer_timer, _rowset_writer->segment_writer_ns());
+    COUNTER_SET(wait_flush_timer, _wait_flush_time_ns);
+    COUNTER_SET(close_wait_timer, _close_wait_time_ns);
+    COUNTER_SET(segment_num, _segment_num);
+    const auto& memtable_stat = _flush_token->memtable_stat();
+    COUNTER_SET(sort_timer, memtable_stat.sort_ns);
+    COUNTER_SET(agg_timer, memtable_stat.agg_ns);
+    COUNTER_SET(memtable_duration_timer, memtable_stat.duration_ns);
+    COUNTER_SET(put_into_output_timer, memtable_stat.put_into_output_ns);
+    COUNTER_SET(sort_times, memtable_stat.sort_times);
+    COUNTER_SET(agg_times, memtable_stat.agg_times);
+    COUNTER_SET(raw_rows_num, memtable_stat.raw_rows);
+    COUNTER_SET(merged_rows_num, memtable_stat.merged_rows);
+}
+
 Status MemTableWriter::cancel() {
     return cancel_with_status(Status::Cancelled("already cancelled"));
 }
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 457534ce3a..e9ec3dc6fb 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -62,7 +62,7 @@ enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
 // This class is NOT thread-safe, external synchronization is required.
 class MemTableWriter {
 public:
-    MemTableWriter(const WriteRequest& req, RuntimeProfile* profile);
+    MemTableWriter(const WriteRequest& req);
 
     ~MemTableWriter();
 
@@ -76,9 +76,15 @@ public:
 
     // flush the last memtable to flush queue, must call it before close_wait()
     Status close();
-    // wait for all memtables to be flushed.
+    // wait for all memtables to be flushed, update profiles if provided.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait();
+    Status close_wait(RuntimeProfile* profile = nullptr) {
+        RETURN_IF_ERROR(_do_close_wait());
+        if (profile != nullptr) {
+            _update_profile(profile);
+        }
+        return Status::OK();
+    }
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -110,7 +116,8 @@ private:
 
     void _reset_mem_table();
 
-    void _init_profile(RuntimeProfile* profile);
+    Status _do_close_wait();
+    void _update_profile(RuntimeProfile* profile);
 
     std::atomic<bool> _is_init = false;
     bool _is_cancelled = false;
@@ -132,22 +139,9 @@ private:
 
     // total rows num written by MemTableWriter
     int64_t _total_received_rows = 0;
-
-    RuntimeProfile* _profile = nullptr;
-    RuntimeProfile::Counter* _lock_timer = nullptr;
-    RuntimeProfile::Counter* _sort_timer = nullptr;
-    RuntimeProfile::Counter* _agg_timer = nullptr;
-    RuntimeProfile::Counter* _wait_flush_timer = nullptr;
-    RuntimeProfile::Counter* _delete_bitmap_timer = nullptr;
-    RuntimeProfile::Counter* _segment_writer_timer = nullptr;
-    RuntimeProfile::Counter* _memtable_duration_timer = nullptr;
-    RuntimeProfile::Counter* _put_into_output_timer = nullptr;
-    RuntimeProfile::Counter* _sort_times = nullptr;
-    RuntimeProfile::Counter* _agg_times = nullptr;
-    RuntimeProfile::Counter* _close_wait_timer = nullptr;
-    RuntimeProfile::Counter* _segment_num = nullptr;
-    RuntimeProfile::Counter* _raw_rows_num = nullptr;
-    RuntimeProfile::Counter* _merged_rows_num = nullptr;
+    int64_t _wait_flush_time_ns = 0;
+    int64_t _close_wait_time_ns = 0;
+    int64_t _segment_num = 0;
 
     MonotonicStopWatch _lock_watch;
 };
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 41064110ea..f215b526ca 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -96,14 +96,6 @@ protected:
         _self_profile->add_info_string("EosHost", fmt::format("{}", 
request.backend_id()));
         bool finished = false;
         auto index_id = request.index_id();
-        // close will reset deltawriter memtable and should deregister writer 
before it.
-        {
-            std::lock_guard<SpinLock> l(_tablets_channels_lock);
-            auto tablet_channel_it = _tablets_channels.find(index_id);
-            if (tablet_channel_it != _tablets_channels.end()) {
-                
tablet_channel_it->second->deregister_memtable_memory_limiter();
-            }
-        }
 
         RETURN_IF_ERROR(channel->close(
                 this, request.sender_id(), request.backend_id(), &finished, 
request.partition_ids(),
@@ -141,7 +133,6 @@ private:
     // lock protect the tablets channel map
     std::mutex _lock;
     // index id -> tablets channel
-    // when you erase, you should call deregister_writer method in 
MemTableMemoryLimiter;
     std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> 
_tablets_channels;
     SpinLock _tablets_channels_lock;
     // This is to save finished channels id, to handle the retry request.
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 9a5c81144b..e41c1f01a1 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -166,7 +166,6 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBlockRequest& request,
     // this case will be handled in load channel's add batch method.
     Status st = channel->add_batch(request, response);
     if (UNLIKELY(!st.ok())) {
-        _deregister_channel_all_writers(channel);
         channel->cancel();
         return st;
     }
@@ -183,7 +182,6 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId 
load_id) {
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_load_channels.find(load_id) != _load_channels.end()) {
-            
_deregister_channel_all_writers(_load_channels.find(load_id)->second);
             _load_channels.erase(load_id);
         }
         auto handle = _last_success_channel->insert(load_id.to_string(), 
nullptr, 1, dummy_deleter);
@@ -199,7 +197,6 @@ Status LoadChannelMgr::cancel(const 
PTabletWriterCancelRequest& params) {
         std::lock_guard<std::mutex> l(_lock);
         if (_load_channels.find(load_id) != _load_channels.end()) {
             cancelled_channel = _load_channels[load_id];
-            _deregister_channel_all_writers(cancelled_channel);
             _load_channels.erase(load_id);
         }
     }
@@ -244,7 +241,6 @@ Status LoadChannelMgr::_start_load_channels_clean() {
         }
 
         for (auto& key : need_delete_channel_ids) {
-            _deregister_channel_all_writers(_load_channels.find(key)->second);
             _load_channels.erase(key);
             LOG(INFO) << "erase timeout load channel: " << key;
         }
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index e111a60a88..81991eee8c 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -77,17 +77,10 @@ private:
         }
     }
 
-    void _deregister_channel_all_writers(std::shared_ptr<doris::LoadChannel> 
channel) {
-        for (auto& [_, tablet_channel] : channel->get_tablets_channels()) {
-            tablet_channel->deregister_memtable_memory_limiter();
-        }
-    }
-
 protected:
     // lock protect the load channel map
     std::mutex _lock;
     // load id -> load channel
-    // when you erase, you should call deregister_writer method in 
MemTableMemoryLimiter ;
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
     Cache* _last_success_channel = nullptr;
 
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index ce82da4e1c..9d8346601b 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -68,9 +68,7 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& key, 
const UniqueId& loa
 
 TabletsChannel::~TabletsChannel() {
     _s_tablet_writer_count -= _tablet_writers.size();
-    auto memtable_memory_limiter = 
ExecEnv::GetInstance()->memtable_memory_limiter();
     for (auto& it : _tablet_writers) {
-        
memtable_memory_limiter->deregister_writer(it.second->memtable_writer());
         delete it.second;
     }
     delete _schema;
@@ -500,19 +498,13 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) 
{
 
 void TabletsChannel::register_memtable_memory_limiter() {
     auto memtable_memory_limiter = 
ExecEnv::GetInstance()->memtable_memory_limiter();
-    _memtable_writers_foreach([memtable_memory_limiter](MemTableWriter* 
writer) {
+    
_memtable_writers_foreach([memtable_memory_limiter](std::shared_ptr<MemTableWriter>
 writer) {
         memtable_memory_limiter->register_writer(writer);
     });
 }
 
-void TabletsChannel::deregister_memtable_memory_limiter() {
-    auto memtable_memory_limiter = 
ExecEnv::GetInstance()->memtable_memory_limiter();
-    _memtable_writers_foreach([memtable_memory_limiter](MemTableWriter* 
writer) {
-        memtable_memory_limiter->deregister_writer(writer);
-    });
-}
-
-void 
TabletsChannel::_memtable_writers_foreach(std::function<void(MemTableWriter*)> 
fn) {
+void TabletsChannel::_memtable_writers_foreach(
+        std::function<void(std::shared_ptr<MemTableWriter>)> fn) {
     std::lock_guard<SpinLock> l(_tablet_writers_lock);
     for (auto& [_, delta_writer] : _tablet_writers) {
         fn(delta_writer->memtable_writer());
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 29fd902ceb..ea8beed799 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -115,8 +115,6 @@ public:
 
     void register_memtable_memory_limiter();
 
-    void deregister_memtable_memory_limiter();
-
 private:
     template <typename Request>
     Status _get_current_seq(int64_t& cur_seq, const Request& request);
@@ -135,7 +133,7 @@ private:
                            int64_t tablet_id, Status error);
     bool _is_broken_tablet(int64_t tablet_id);
     void _init_profile(RuntimeProfile* profile);
-    void _memtable_writers_foreach(std::function<void(MemTableWriter*)> fn);
+    void 
_memtable_writers_foreach(std::function<void(std::shared_ptr<MemTableWriter>)> 
fn);
 
     // id of this load channel
     TabletsChannelKey _key;
@@ -170,7 +168,6 @@ private:
     Status _close_status;
 
     // tablet_id -> TabletChannel
-    // when you erase, you should call deregister_writer method in 
MemTableMemoryLimiter;
     std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
     // broken tablet ids.
     // If a tablet write fails, it's id will be added to this set.
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index 7f13fa3c44..404a4fb61a 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -173,10 +173,6 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
     }
     _mgr->handle_memtable_flush();
     CHECK_EQ(0, memtable_writer->active_memtable_mem_consumption());
-    {
-        std::lock_guard<std::mutex> l(lock);
-        _mgr->deregister_writer(memtable_writer);
-    }
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to