This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f1e43fcaa4 [opt](cache) Support segment cache dynamic opening and 
closing (#23659)
f1e43fcaa4 is described below

commit f1e43fcaa4d24742f99f4cac2097b80f975f3677
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Aug 31 18:48:26 2023 +0800

    [opt](cache) Support segment cache dynamic opening and closing (#23659)
    
    Dynamically modify the config to clear the cache, each time the disable 
cache will only be cleared once.
    TODO, Support page cache and other caches.
    
    curl -X POST http://xxxx:8040/api/update_config?disable_segment_cache=true
---
 be/src/common/config.cpp                           |  1 +
 be/src/common/config.h                             |  2 +
 be/src/olap/olap_server.cpp                        | 11 +++++
 be/src/olap/page_cache.h                           | 13 +++---
 .../rowset/segment_v2/inverted_index_cache.cpp     |  2 +-
 .../olap/rowset/segment_v2/inverted_index_cache.h  |  5 ++-
 be/src/olap/schema_cache.h                         |  2 +-
 be/src/olap/segment_loader.cpp                     |  4 +-
 be/src/olap/segment_loader.h                       |  2 +-
 be/src/olap/storage_engine.h                       |  2 +
 be/src/runtime/memory/cache_manager.cpp            | 11 ++++-
 be/src/runtime/memory/cache_manager.h              |  5 ++-
 be/src/runtime/memory/cache_policy.cpp             | 34 +++++++++++++++
 be/src/runtime/memory/cache_policy.h               | 50 ++++++++++++++++++----
 be/src/runtime/memory/lru_cache_policy.h           | 31 ++++++++------
 be/src/service/point_query_executor.h              |  4 +-
 be/test/testutil/run_all_tests.cpp                 |  1 +
 17 files changed, 141 insertions(+), 39 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 873db8fd67..01fadad825 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -300,6 +300,7 @@ DEFINE_mInt32(trash_file_expire_time_sec, "259200");
 // minimum file descriptor number
 // modify them upon necessity
 DEFINE_Int32(min_file_descriptor_number, "60000");
+DEFINE_mBool(disable_segment_cache, "false");
 DEFINE_Int64(index_stream_cache_capacity, "10737418240");
 DEFINE_String(row_cache_mem_limit, "20%");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9c452c8cfc..5c098b0806 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -347,6 +347,7 @@ DECLARE_mInt32(trash_file_expire_time_sec);
 // minimum file descriptor number
 // modify them upon necessity
 DECLARE_Int32(min_file_descriptor_number);
+DECLARE_mBool(disable_segment_cache);
 DECLARE_Int64(index_stream_cache_capacity);
 DECLARE_String(row_cache_mem_limit);
 
@@ -359,6 +360,7 @@ DECLARE_Int32(storage_page_cache_shard_size);
 // all storage page cache will be divided into data_page_cache and 
index_page_cache
 DECLARE_Int32(index_page_cache_percentage);
 // whether to disable page cache feature in storage
+// TODO delete it. Divided into Data page, Index page, pk index page
 DECLARE_Bool(disable_storage_page_cache);
 // whether to disable row cache feature in storage
 DECLARE_Bool(disable_storage_row_cache);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index e2840d29b4..d000e4eb48 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -259,6 +259,17 @@ void StorageEngine::_cache_clean_callback() {
         }
 
         CacheManager::instance()->for_each_cache_prune_stale();
+
+        // Dynamically modify the config to clear the cache, each time the 
disable cache will only be cleared once.
+        // TODO, Support page cache and other caches.
+        if (config::disable_segment_cache) {
+            if (!_clear_segment_cache) {
+                
CacheManager::instance()->clear_once(CachePolicy::CacheType::SEGMENT_CACHE);
+                _clear_segment_cache = true;
+            }
+        } else {
+            _clear_segment_cache = false;
+        }
     }
 }
 
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index 3f76546013..e1e48f2856 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -103,21 +103,24 @@ public:
     class DataPageCache : public LRUCachePolicy {
     public:
         DataPageCache(size_t capacity, uint32_t num_shards)
-                : LRUCachePolicy("DataPageCache", capacity, LRUCacheType::SIZE,
-                                 config::data_page_cache_stale_sweep_time_sec, 
num_shards) {}
+                : LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, 
capacity,
+                                 LRUCacheType::SIZE, 
config::data_page_cache_stale_sweep_time_sec,
+                                 num_shards) {}
     };
 
     class IndexPageCache : public LRUCachePolicy {
     public:
         IndexPageCache(size_t capacity, uint32_t num_shards)
-                : LRUCachePolicy("IndexPageCache", capacity, 
LRUCacheType::SIZE,
-                                 
config::index_page_cache_stale_sweep_time_sec, num_shards) {}
+                : LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE, 
capacity,
+                                 LRUCacheType::SIZE, 
config::index_page_cache_stale_sweep_time_sec,
+                                 num_shards) {}
     };
 
     class PKIndexPageCache : public LRUCachePolicy {
     public:
         PKIndexPageCache(size_t capacity, uint32_t num_shards)
-                : LRUCachePolicy("PKIndexPageCache", capacity, 
LRUCacheType::SIZE,
+                : LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE, 
capacity,
+                                 LRUCacheType::SIZE,
                                  
config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {}
     };
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index f3c68984eb..055365cf31 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -62,7 +62,7 @@ void 
InvertedIndexSearcherCache::create_global_instance(size_t capacity, uint32_
 }
 
 InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, 
uint32_t num_shards)
-        : LRUCachePolicy("InvertedIndexSearcherCache",
+        : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE,
                          config::inverted_index_cache_stale_sweep_time_sec),
           
_mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h 
b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
index 9f368eca0c..388ee02ee9 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
@@ -237,8 +237,9 @@ public:
     InvertedIndexQueryCache() = delete;
 
     InvertedIndexQueryCache(size_t capacity, uint32_t num_shards)
-            : LRUCachePolicy("InvertedIndexQueryCache", capacity, 
LRUCacheType::SIZE,
-                             
config::inverted_index_cache_stale_sweep_time_sec, num_shards) {}
+            : 
LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity,
+                             LRUCacheType::SIZE, 
config::inverted_index_cache_stale_sweep_time_sec,
+                             num_shards) {}
 
     bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle);
 
diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h
index f34f7c296d..5d94c92837 100644
--- a/be/src/olap/schema_cache.h
+++ b/be/src/olap/schema_cache.h
@@ -117,7 +117,7 @@ public:
 
 private:
     SchemaCache(size_t capacity)
-            : LRUCachePolicy("SchemaCache", capacity, LRUCacheType::NUMBER,
+            : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity, 
LRUCacheType::NUMBER,
                              config::schema_cache_sweep_time_sec) {}
     static constexpr char SCHEMA_DELIMITER = '-';
     static SchemaCache* _s_instance;
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 4704f8e802..a0350dcfc2 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -70,14 +70,14 @@ Status SegmentLoader::load_segments(const 
BetaRowsetSharedPtr& rowset,
     }
 
     SegmentCache::CacheKey cache_key(rowset->rowset_id());
-    if (_segment_cache->lookup(cache_key, cache_handle)) {
+    if (!config::disable_segment_cache && _segment_cache->lookup(cache_key, 
cache_handle)) {
         return Status::OK();
     }
 
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(rowset->load_segments(&segments));
 
-    if (use_cache) {
+    if (use_cache && !config::disable_segment_cache) {
         // memory of SegmentCache::CacheValue will be handled by SegmentCache
         SegmentCache::CacheValue* cache_value = new SegmentCache::CacheValue();
         cache_value->segments = std::move(segments);
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 589a930201..ce0d00909a 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -73,7 +73,7 @@ public:
     };
 
     SegmentCache(size_t capacity)
-            : LRUCachePolicy("SegmentCache", capacity, LRUCacheType::NUMBER,
+            : LRUCachePolicy(CachePolicy::CacheType::SEGMENT_CACHE, capacity, 
LRUCacheType::NUMBER,
                              config::tablet_rowset_stale_sweep_time_sec) {}
 
     // Lookup the given rowset in the cache.
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index a6ede08162..60db071e24 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -483,6 +483,8 @@ private:
     scoped_refptr<Thread> _async_publish_thread;
     std::mutex _async_publish_mutex;
 
+    bool _clear_segment_cache = false;
+
     DISALLOW_COPY_AND_ASSIGN(StorageEngine);
 };
 
diff --git a/be/src/runtime/memory/cache_manager.cpp 
b/be/src/runtime/memory/cache_manager.cpp
index 027ed81b16..eada36836d 100644
--- a/be/src/runtime/memory/cache_manager.cpp
+++ b/be/src/runtime/memory/cache_manager.cpp
@@ -43,7 +43,16 @@ int64_t 
CacheManager::for_each_cache_prune_stale(RuntimeProfile* profile) {
 
 int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) {
     return for_each_cache_prune_stale_wrap(
-            [](CachePolicy* cache_policy) { cache_policy->prune_all(); }, 
profile);
+            [](CachePolicy* cache_policy) { cache_policy->prune_all(false); }, 
profile);
+}
+
+void CacheManager::clear_once(CachePolicy::CacheType type) {
+    std::lock_guard<std::mutex> l(_caches_lock);
+    for (auto cache_policy : _caches) {
+        if (cache_policy->type() == type) {
+            cache_policy->prune_all(true); // will print log
+        }
+    }
 }
 
 } // namespace doris
diff --git a/be/src/runtime/memory/cache_manager.h 
b/be/src/runtime/memory/cache_manager.h
index 6086c02b94..fd7d5875b0 100644
--- a/be/src/runtime/memory/cache_manager.h
+++ b/be/src/runtime/memory/cache_manager.h
@@ -17,12 +17,11 @@
 
 #pragma once
 
+#include "runtime/memory/cache_policy.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
 
-class CachePolicy;
-
 // Hold the list of all caches, for prune when memory not enough or timing.
 class CacheManager {
 public:
@@ -53,6 +52,8 @@ public:
 
     int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr);
 
+    void clear_once(CachePolicy::CacheType type);
+
 private:
     static inline CacheManager* _s_instance = nullptr;
 
diff --git a/be/src/runtime/memory/cache_policy.cpp 
b/be/src/runtime/memory/cache_policy.cpp
new file mode 100644
index 0000000000..e79beaffa8
--- /dev/null
+++ b/be/src/runtime/memory/cache_policy.cpp
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/cache_policy.h"
+
+#include "runtime/memory/cache_manager.h"
+
+namespace doris {
+
+CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s)
+        : _type(type), _stale_sweep_time_s(stale_sweep_time_s) {
+    _it = CacheManager::instance()->register_cache(this);
+    init_profile();
+}
+
+CachePolicy::~CachePolicy() {
+    CacheManager::instance()->unregister_cache(_it);
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/cache_policy.h 
b/be/src/runtime/memory/cache_policy.h
index 14308088e6..008e5f4a47 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include "runtime/memory/cache_manager.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -27,21 +26,54 @@ static constexpr int32_t CACHE_MIN_FREE_SIZE = 67108864; // 
64M
 // Base of all caches. register to CacheManager when cache is constructed.
 class CachePolicy {
 public:
-    CachePolicy(const std::string& name, uint32_t stale_sweep_time_s)
-            : _name(name), _stale_sweep_time_s(stale_sweep_time_s) {
-        _it = CacheManager::instance()->register_cache(this);
-        init_profile();
+    enum class CacheType {
+        DATA_PAGE_CACHE = 0,
+        INDEXPAGE_CACHE = 1,
+        PK_INDEX_PAGE_CACHE = 2,
+        SCHEMA_CACHE = 3,
+        SEGMENT_CACHE = 4,
+        INVERTEDINDEX_SEARCHER_CACHE = 5,
+        INVERTEDINDEX_QUERY_CACHE = 6,
+        LOOKUP_CONNECTION_CACHE = 7
+    };
+
+    static std::string type_string(CacheType type) {
+        switch (type) {
+        case CacheType::DATA_PAGE_CACHE:
+            return "DataPageCache";
+        case CacheType::INDEXPAGE_CACHE:
+            return "IndexPageCache";
+        case CacheType::PK_INDEX_PAGE_CACHE:
+            return "PKIndexPageCache";
+        case CacheType::SCHEMA_CACHE:
+            return "SchemaCache";
+        case CacheType::SEGMENT_CACHE:
+            return "SegmentCache";
+        case CacheType::INVERTEDINDEX_SEARCHER_CACHE:
+            return "InvertedIndexSearcherCache";
+        case CacheType::INVERTEDINDEX_QUERY_CACHE:
+            return "InvertedIndexQueryCache";
+        case CacheType::LOOKUP_CONNECTION_CACHE:
+            return "LookupConnectionCache";
+        default:
+            LOG(FATAL) << "not match type of cache policy :" << 
static_cast<int>(type);
+        }
+        __builtin_unreachable();
     }
 
-    virtual ~CachePolicy() { CacheManager::instance()->unregister_cache(_it); 
};
+    CachePolicy(CacheType type, uint32_t stale_sweep_time_s);
+    virtual ~CachePolicy();
+
     virtual void prune_stale() = 0;
-    virtual void prune_all() = 0;
+    virtual void prune_all(bool clear) = 0;
 
+    CacheType type() { return _type; }
     RuntimeProfile* profile() { return _profile.get(); }
 
 protected:
     void init_profile() {
-        _profile = std::make_unique<RuntimeProfile>(fmt::format("Cache 
name={}", _name));
+        _profile =
+                std::make_unique<RuntimeProfile>(fmt::format("Cache type={}", 
type_string(_type)));
         _prune_stale_number_counter = ADD_COUNTER(_profile, 
"PruneStaleNumber", TUnit::UNIT);
         _prune_all_number_counter = ADD_COUNTER(_profile, "PruneAllNumber", 
TUnit::UNIT);
         _freed_memory_counter = ADD_COUNTER(_profile, "FreedMemory", 
TUnit::BYTES);
@@ -49,7 +81,7 @@ protected:
         _cost_timer = ADD_TIMER(_profile, "CostTime");
     }
 
-    std::string _name;
+    CacheType _type;
     std::list<CachePolicy*>::iterator _it;
 
     std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/runtime/memory/lru_cache_policy.h 
b/be/src/runtime/memory/lru_cache_policy.h
index fd900bea6c..e7b9680eb6 100644
--- a/be/src/runtime/memory/lru_cache_policy.h
+++ b/be/src/runtime/memory/lru_cache_policy.h
@@ -34,14 +34,16 @@ struct LRUCacheValueBase {
 // Base of lru cache, allow prune stale entry and prune all entry.
 class LRUCachePolicy : public CachePolicy {
 public:
-    LRUCachePolicy(const std::string& name, uint32_t stale_sweep_time_s)
-            : CachePolicy(name, stale_sweep_time_s) {};
-    LRUCachePolicy(const std::string& name, size_t capacity, LRUCacheType type,
+    LRUCachePolicy(CacheType type, uint32_t stale_sweep_time_s)
+            : CachePolicy(type, stale_sweep_time_s) {};
+    LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType 
lru_cache_type,
                    uint32_t stale_sweep_time_s, uint32_t num_shards = -1)
-            : CachePolicy(name, stale_sweep_time_s) {
+            : CachePolicy(type, stale_sweep_time_s) {
         _cache = num_shards == -1
-                         ? std::unique_ptr<Cache>(new_lru_cache(name, 
capacity, type))
-                         : std::unique_ptr<Cache>(new_lru_cache(name, 
capacity, type, num_shards));
+                         ? std::unique_ptr<Cache>(
+                                   new_lru_cache(type_string(type), capacity, 
lru_cache_type))
+                         : 
std::unique_ptr<Cache>(new_lru_cache(type_string(type), capacity,
+                                                                
lru_cache_type, num_shards));
     }
 
     ~LRUCachePolicy() override = default;
@@ -66,23 +68,26 @@ public:
             COUNTER_SET(_freed_entrys_counter, _cache->prune_if(pred, true));
             COUNTER_SET(_freed_memory_counter, byte_size);
             COUNTER_UPDATE(_prune_stale_number_counter, 1);
-            LOG(INFO) << fmt::format("{} prune stale {} entries, {} bytes, {} 
times prune", _name,
-                                     _freed_entrys_counter->value(), 
_freed_memory_counter->value(),
+            LOG(INFO) << fmt::format("{} prune stale {} entries, {} bytes, {} 
times prune",
+                                     type_string(_type), 
_freed_entrys_counter->value(),
+                                     _freed_memory_counter->value(),
                                      _prune_stale_number_counter->value());
         }
     }
 
-    void prune_all() override {
-        if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) {
+    void prune_all(bool clear) override {
+        if ((clear && _cache->mem_consumption() != 0) ||
+            _cache->mem_consumption() > CACHE_MIN_FREE_SIZE) {
             COUNTER_SET(_cost_timer, (int64_t)0);
             SCOPED_TIMER(_cost_timer);
             auto size = _cache->mem_consumption();
             COUNTER_SET(_freed_entrys_counter, _cache->prune());
             COUNTER_SET(_freed_memory_counter, size);
             COUNTER_UPDATE(_prune_all_number_counter, 1);
-            LOG(INFO) << fmt::format("{} prune all {} entries, {} bytes, {} 
times prune", _name,
-                                     _freed_entrys_counter->value(), 
_freed_memory_counter->value(),
-                                     _prune_stale_number_counter->value());
+            LOG(INFO) << fmt::format(
+                    "{} prune all {} entries, {} bytes, {} times prune, is 
clear: {}",
+                    type_string(_type), _freed_entrys_counter->value(),
+                    _freed_memory_counter->value(), 
_prune_stale_number_counter->value(), clear);
         }
     }
 
diff --git a/be/src/service/point_query_executor.h 
b/be/src/service/point_query_executor.h
index 58ddba35c9..a49bfb442f 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -198,8 +198,8 @@ public:
 private:
     friend class PointQueryExecutor;
     LookupConnectionCache(size_t capacity)
-            : LRUCachePolicy("LookupConnectionCache", capacity, 
LRUCacheType::SIZE,
-                             config::tablet_lookup_cache_clean_interval) {}
+            : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE, 
capacity,
+                             LRUCacheType::SIZE, 
config::tablet_lookup_cache_clean_interval) {}
 
     std::string encode_key(__int128_t cache_id) {
         fmt::memory_buffer buffer;
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index ab3b0b1ea0..2e735ce8fa 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -28,6 +28,7 @@
 #include "olap/segment_loader.h"
 #include "olap/tablet_schema_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/cache_manager.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to