This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f1e43fcaa4 [opt](cache) Support segment cache dynamic opening and
closing (#23659)
f1e43fcaa4 is described below
commit f1e43fcaa4d24742f99f4cac2097b80f975f3677
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Aug 31 18:48:26 2023 +0800
[opt](cache) Support segment cache dynamic opening and closing (#23659)
Dynamically modify the config to clear the cache, each time the disable
cache will only be cleared once.
TODO, Support page cache and other caches.
curl -X POST http://xxxx:8040/api/update_config?disable_segment_cache=true
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 2 +
be/src/olap/olap_server.cpp | 11 +++++
be/src/olap/page_cache.h | 13 +++---
.../rowset/segment_v2/inverted_index_cache.cpp | 2 +-
.../olap/rowset/segment_v2/inverted_index_cache.h | 5 ++-
be/src/olap/schema_cache.h | 2 +-
be/src/olap/segment_loader.cpp | 4 +-
be/src/olap/segment_loader.h | 2 +-
be/src/olap/storage_engine.h | 2 +
be/src/runtime/memory/cache_manager.cpp | 11 ++++-
be/src/runtime/memory/cache_manager.h | 5 ++-
be/src/runtime/memory/cache_policy.cpp | 34 +++++++++++++++
be/src/runtime/memory/cache_policy.h | 50 ++++++++++++++++++----
be/src/runtime/memory/lru_cache_policy.h | 31 ++++++++------
be/src/service/point_query_executor.h | 4 +-
be/test/testutil/run_all_tests.cpp | 1 +
17 files changed, 141 insertions(+), 39 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 873db8fd67..01fadad825 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -300,6 +300,7 @@ DEFINE_mInt32(trash_file_expire_time_sec, "259200");
// minimum file descriptor number
// modify them upon necessity
DEFINE_Int32(min_file_descriptor_number, "60000");
+DEFINE_mBool(disable_segment_cache, "false");
DEFINE_Int64(index_stream_cache_capacity, "10737418240");
DEFINE_String(row_cache_mem_limit, "20%");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9c452c8cfc..5c098b0806 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -347,6 +347,7 @@ DECLARE_mInt32(trash_file_expire_time_sec);
// minimum file descriptor number
// modify them upon necessity
DECLARE_Int32(min_file_descriptor_number);
+DECLARE_mBool(disable_segment_cache);
DECLARE_Int64(index_stream_cache_capacity);
DECLARE_String(row_cache_mem_limit);
@@ -359,6 +360,7 @@ DECLARE_Int32(storage_page_cache_shard_size);
// all storage page cache will be divided into data_page_cache and
index_page_cache
DECLARE_Int32(index_page_cache_percentage);
// whether to disable page cache feature in storage
+// TODO delete it. Divided into Data page, Index page, pk index page
DECLARE_Bool(disable_storage_page_cache);
// whether to disable row cache feature in storage
DECLARE_Bool(disable_storage_row_cache);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index e2840d29b4..d000e4eb48 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -259,6 +259,17 @@ void StorageEngine::_cache_clean_callback() {
}
CacheManager::instance()->for_each_cache_prune_stale();
+
+ // Dynamically modify the config to clear the cache, each time the
disable cache will only be cleared once.
+ // TODO, Support page cache and other caches.
+ if (config::disable_segment_cache) {
+ if (!_clear_segment_cache) {
+
CacheManager::instance()->clear_once(CachePolicy::CacheType::SEGMENT_CACHE);
+ _clear_segment_cache = true;
+ }
+ } else {
+ _clear_segment_cache = false;
+ }
}
}
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index 3f76546013..e1e48f2856 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -103,21 +103,24 @@ public:
class DataPageCache : public LRUCachePolicy {
public:
DataPageCache(size_t capacity, uint32_t num_shards)
- : LRUCachePolicy("DataPageCache", capacity, LRUCacheType::SIZE,
- config::data_page_cache_stale_sweep_time_sec,
num_shards) {}
+ : LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE,
capacity,
+ LRUCacheType::SIZE,
config::data_page_cache_stale_sweep_time_sec,
+ num_shards) {}
};
class IndexPageCache : public LRUCachePolicy {
public:
IndexPageCache(size_t capacity, uint32_t num_shards)
- : LRUCachePolicy("IndexPageCache", capacity,
LRUCacheType::SIZE,
-
config::index_page_cache_stale_sweep_time_sec, num_shards) {}
+ : LRUCachePolicy(CachePolicy::CacheType::INDEXPAGE_CACHE,
capacity,
+ LRUCacheType::SIZE,
config::index_page_cache_stale_sweep_time_sec,
+ num_shards) {}
};
class PKIndexPageCache : public LRUCachePolicy {
public:
PKIndexPageCache(size_t capacity, uint32_t num_shards)
- : LRUCachePolicy("PKIndexPageCache", capacity,
LRUCacheType::SIZE,
+ : LRUCachePolicy(CachePolicy::CacheType::PK_INDEX_PAGE_CACHE,
capacity,
+ LRUCacheType::SIZE,
config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {}
};
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index f3c68984eb..055365cf31 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -62,7 +62,7 @@ void
InvertedIndexSearcherCache::create_global_instance(size_t capacity, uint32_
}
InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity,
uint32_t num_shards)
- : LRUCachePolicy("InvertedIndexSearcherCache",
+ : LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE,
config::inverted_index_cache_stale_sweep_time_sec),
_mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
index 9f368eca0c..388ee02ee9 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
@@ -237,8 +237,9 @@ public:
InvertedIndexQueryCache() = delete;
InvertedIndexQueryCache(size_t capacity, uint32_t num_shards)
- : LRUCachePolicy("InvertedIndexQueryCache", capacity,
LRUCacheType::SIZE,
-
config::inverted_index_cache_stale_sweep_time_sec, num_shards) {}
+ :
LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_QUERY_CACHE, capacity,
+ LRUCacheType::SIZE,
config::inverted_index_cache_stale_sweep_time_sec,
+ num_shards) {}
bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle);
diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h
index f34f7c296d..5d94c92837 100644
--- a/be/src/olap/schema_cache.h
+++ b/be/src/olap/schema_cache.h
@@ -117,7 +117,7 @@ public:
private:
SchemaCache(size_t capacity)
- : LRUCachePolicy("SchemaCache", capacity, LRUCacheType::NUMBER,
+ : LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity,
LRUCacheType::NUMBER,
config::schema_cache_sweep_time_sec) {}
static constexpr char SCHEMA_DELIMITER = '-';
static SchemaCache* _s_instance;
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 4704f8e802..a0350dcfc2 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -70,14 +70,14 @@ Status SegmentLoader::load_segments(const
BetaRowsetSharedPtr& rowset,
}
SegmentCache::CacheKey cache_key(rowset->rowset_id());
- if (_segment_cache->lookup(cache_key, cache_handle)) {
+ if (!config::disable_segment_cache && _segment_cache->lookup(cache_key,
cache_handle)) {
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(rowset->load_segments(&segments));
- if (use_cache) {
+ if (use_cache && !config::disable_segment_cache) {
// memory of SegmentCache::CacheValue will be handled by SegmentCache
SegmentCache::CacheValue* cache_value = new SegmentCache::CacheValue();
cache_value->segments = std::move(segments);
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 589a930201..ce0d00909a 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -73,7 +73,7 @@ public:
};
SegmentCache(size_t capacity)
- : LRUCachePolicy("SegmentCache", capacity, LRUCacheType::NUMBER,
+ : LRUCachePolicy(CachePolicy::CacheType::SEGMENT_CACHE, capacity,
LRUCacheType::NUMBER,
config::tablet_rowset_stale_sweep_time_sec) {}
// Lookup the given rowset in the cache.
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index a6ede08162..60db071e24 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -483,6 +483,8 @@ private:
scoped_refptr<Thread> _async_publish_thread;
std::mutex _async_publish_mutex;
+ bool _clear_segment_cache = false;
+
DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};
diff --git a/be/src/runtime/memory/cache_manager.cpp
b/be/src/runtime/memory/cache_manager.cpp
index 027ed81b16..eada36836d 100644
--- a/be/src/runtime/memory/cache_manager.cpp
+++ b/be/src/runtime/memory/cache_manager.cpp
@@ -43,7 +43,16 @@ int64_t
CacheManager::for_each_cache_prune_stale(RuntimeProfile* profile) {
int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) {
return for_each_cache_prune_stale_wrap(
- [](CachePolicy* cache_policy) { cache_policy->prune_all(); },
profile);
+ [](CachePolicy* cache_policy) { cache_policy->prune_all(false); },
profile);
+}
+
+void CacheManager::clear_once(CachePolicy::CacheType type) {
+ std::lock_guard<std::mutex> l(_caches_lock);
+ for (auto cache_policy : _caches) {
+ if (cache_policy->type() == type) {
+ cache_policy->prune_all(true); // will print log
+ }
+ }
}
} // namespace doris
diff --git a/be/src/runtime/memory/cache_manager.h
b/be/src/runtime/memory/cache_manager.h
index 6086c02b94..fd7d5875b0 100644
--- a/be/src/runtime/memory/cache_manager.h
+++ b/be/src/runtime/memory/cache_manager.h
@@ -17,12 +17,11 @@
#pragma once
+#include "runtime/memory/cache_policy.h"
#include "util/runtime_profile.h"
namespace doris {
-class CachePolicy;
-
// Hold the list of all caches, for prune when memory not enough or timing.
class CacheManager {
public:
@@ -53,6 +52,8 @@ public:
int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr);
+ void clear_once(CachePolicy::CacheType type);
+
private:
static inline CacheManager* _s_instance = nullptr;
diff --git a/be/src/runtime/memory/cache_policy.cpp
b/be/src/runtime/memory/cache_policy.cpp
new file mode 100644
index 0000000000..e79beaffa8
--- /dev/null
+++ b/be/src/runtime/memory/cache_policy.cpp
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/cache_policy.h"
+
+#include "runtime/memory/cache_manager.h"
+
+namespace doris {
+
+CachePolicy::CachePolicy(CacheType type, uint32_t stale_sweep_time_s)
+ : _type(type), _stale_sweep_time_s(stale_sweep_time_s) {
+ _it = CacheManager::instance()->register_cache(this);
+ init_profile();
+}
+
+CachePolicy::~CachePolicy() {
+ CacheManager::instance()->unregister_cache(_it);
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index 14308088e6..008e5f4a47 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -17,7 +17,6 @@
#pragma once
-#include "runtime/memory/cache_manager.h"
#include "util/runtime_profile.h"
namespace doris {
@@ -27,21 +26,54 @@ static constexpr int32_t CACHE_MIN_FREE_SIZE = 67108864; //
64M
// Base of all caches. register to CacheManager when cache is constructed.
class CachePolicy {
public:
- CachePolicy(const std::string& name, uint32_t stale_sweep_time_s)
- : _name(name), _stale_sweep_time_s(stale_sweep_time_s) {
- _it = CacheManager::instance()->register_cache(this);
- init_profile();
+ enum class CacheType {
+ DATA_PAGE_CACHE = 0,
+ INDEXPAGE_CACHE = 1,
+ PK_INDEX_PAGE_CACHE = 2,
+ SCHEMA_CACHE = 3,
+ SEGMENT_CACHE = 4,
+ INVERTEDINDEX_SEARCHER_CACHE = 5,
+ INVERTEDINDEX_QUERY_CACHE = 6,
+ LOOKUP_CONNECTION_CACHE = 7
+ };
+
+ static std::string type_string(CacheType type) {
+ switch (type) {
+ case CacheType::DATA_PAGE_CACHE:
+ return "DataPageCache";
+ case CacheType::INDEXPAGE_CACHE:
+ return "IndexPageCache";
+ case CacheType::PK_INDEX_PAGE_CACHE:
+ return "PKIndexPageCache";
+ case CacheType::SCHEMA_CACHE:
+ return "SchemaCache";
+ case CacheType::SEGMENT_CACHE:
+ return "SegmentCache";
+ case CacheType::INVERTEDINDEX_SEARCHER_CACHE:
+ return "InvertedIndexSearcherCache";
+ case CacheType::INVERTEDINDEX_QUERY_CACHE:
+ return "InvertedIndexQueryCache";
+ case CacheType::LOOKUP_CONNECTION_CACHE:
+ return "LookupConnectionCache";
+ default:
+ LOG(FATAL) << "not match type of cache policy :" <<
static_cast<int>(type);
+ }
+ __builtin_unreachable();
}
- virtual ~CachePolicy() { CacheManager::instance()->unregister_cache(_it);
};
+ CachePolicy(CacheType type, uint32_t stale_sweep_time_s);
+ virtual ~CachePolicy();
+
virtual void prune_stale() = 0;
- virtual void prune_all() = 0;
+ virtual void prune_all(bool clear) = 0;
+ CacheType type() { return _type; }
RuntimeProfile* profile() { return _profile.get(); }
protected:
void init_profile() {
- _profile = std::make_unique<RuntimeProfile>(fmt::format("Cache
name={}", _name));
+ _profile =
+ std::make_unique<RuntimeProfile>(fmt::format("Cache type={}",
type_string(_type)));
_prune_stale_number_counter = ADD_COUNTER(_profile,
"PruneStaleNumber", TUnit::UNIT);
_prune_all_number_counter = ADD_COUNTER(_profile, "PruneAllNumber",
TUnit::UNIT);
_freed_memory_counter = ADD_COUNTER(_profile, "FreedMemory",
TUnit::BYTES);
@@ -49,7 +81,7 @@ protected:
_cost_timer = ADD_TIMER(_profile, "CostTime");
}
- std::string _name;
+ CacheType _type;
std::list<CachePolicy*>::iterator _it;
std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/runtime/memory/lru_cache_policy.h
b/be/src/runtime/memory/lru_cache_policy.h
index fd900bea6c..e7b9680eb6 100644
--- a/be/src/runtime/memory/lru_cache_policy.h
+++ b/be/src/runtime/memory/lru_cache_policy.h
@@ -34,14 +34,16 @@ struct LRUCacheValueBase {
// Base of lru cache, allow prune stale entry and prune all entry.
class LRUCachePolicy : public CachePolicy {
public:
- LRUCachePolicy(const std::string& name, uint32_t stale_sweep_time_s)
- : CachePolicy(name, stale_sweep_time_s) {};
- LRUCachePolicy(const std::string& name, size_t capacity, LRUCacheType type,
+ LRUCachePolicy(CacheType type, uint32_t stale_sweep_time_s)
+ : CachePolicy(type, stale_sweep_time_s) {};
+ LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType
lru_cache_type,
uint32_t stale_sweep_time_s, uint32_t num_shards = -1)
- : CachePolicy(name, stale_sweep_time_s) {
+ : CachePolicy(type, stale_sweep_time_s) {
_cache = num_shards == -1
- ? std::unique_ptr<Cache>(new_lru_cache(name,
capacity, type))
- : std::unique_ptr<Cache>(new_lru_cache(name,
capacity, type, num_shards));
+ ? std::unique_ptr<Cache>(
+ new_lru_cache(type_string(type), capacity,
lru_cache_type))
+ :
std::unique_ptr<Cache>(new_lru_cache(type_string(type), capacity,
+
lru_cache_type, num_shards));
}
~LRUCachePolicy() override = default;
@@ -66,23 +68,26 @@ public:
COUNTER_SET(_freed_entrys_counter, _cache->prune_if(pred, true));
COUNTER_SET(_freed_memory_counter, byte_size);
COUNTER_UPDATE(_prune_stale_number_counter, 1);
- LOG(INFO) << fmt::format("{} prune stale {} entries, {} bytes, {}
times prune", _name,
- _freed_entrys_counter->value(),
_freed_memory_counter->value(),
+ LOG(INFO) << fmt::format("{} prune stale {} entries, {} bytes, {}
times prune",
+ type_string(_type),
_freed_entrys_counter->value(),
+ _freed_memory_counter->value(),
_prune_stale_number_counter->value());
}
}
- void prune_all() override {
- if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) {
+ void prune_all(bool clear) override {
+ if ((clear && _cache->mem_consumption() != 0) ||
+ _cache->mem_consumption() > CACHE_MIN_FREE_SIZE) {
COUNTER_SET(_cost_timer, (int64_t)0);
SCOPED_TIMER(_cost_timer);
auto size = _cache->mem_consumption();
COUNTER_SET(_freed_entrys_counter, _cache->prune());
COUNTER_SET(_freed_memory_counter, size);
COUNTER_UPDATE(_prune_all_number_counter, 1);
- LOG(INFO) << fmt::format("{} prune all {} entries, {} bytes, {}
times prune", _name,
- _freed_entrys_counter->value(),
_freed_memory_counter->value(),
- _prune_stale_number_counter->value());
+ LOG(INFO) << fmt::format(
+ "{} prune all {} entries, {} bytes, {} times prune, is
clear: {}",
+ type_string(_type), _freed_entrys_counter->value(),
+ _freed_memory_counter->value(),
_prune_stale_number_counter->value(), clear);
}
}
diff --git a/be/src/service/point_query_executor.h
b/be/src/service/point_query_executor.h
index 58ddba35c9..a49bfb442f 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -198,8 +198,8 @@ public:
private:
friend class PointQueryExecutor;
LookupConnectionCache(size_t capacity)
- : LRUCachePolicy("LookupConnectionCache", capacity,
LRUCacheType::SIZE,
- config::tablet_lookup_cache_clean_interval) {}
+ : LRUCachePolicy(CachePolicy::CacheType::LOOKUP_CONNECTION_CACHE,
capacity,
+ LRUCacheType::SIZE,
config::tablet_lookup_cache_clean_interval) {}
std::string encode_key(__int128_t cache_id) {
fmt::memory_buffer buffer;
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index ab3b0b1ea0..2e735ce8fa 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -28,6 +28,7 @@
#include "olap/segment_loader.h"
#include "olap/tablet_schema_cache.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/cache_manager.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]