This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c7ae2a7d22 [Refactor & Bugfix](static variables) move some static
vairables to exec_env (#24029)
c7ae2a7d22 is described below
commit c7ae2a7d220429a39c52ded077c0d5b484223f38
Author: zhiqqqq <[email protected]>
AuthorDate: Wed Sep 13 09:27:03 2023 +0800
[Refactor & Bugfix](static variables) move some static vairables to
exec_env (#24029)
---
be/src/common/daemon.cpp | 7 +-
be/src/common/daemon.h | 2 +-
be/src/common/resource_tls.cpp | 69 ------
be/src/common/resource_tls.h | 30 ---
be/src/http/action/file_cache_action.cpp | 4 +-
be/src/io/cache/block/block_file_cache_factory.cpp | 6 +-
be/src/io/cache/block/block_file_cache_factory.h | 2 +-
.../io/cache/block/cached_remote_file_reader.cpp | 8 +-
be/src/io/fs/s3_file_write_bufferpool.h | 4 +-
be/src/olap/olap_define.h | 14 ++
be/src/olap/page_cache.cpp | 18 +-
be/src/olap/page_cache.h | 9 +-
.../rowset/segment_v2/inverted_index_cache.cpp | 13 +-
.../olap/rowset/segment_v2/inverted_index_cache.h | 29 ++-
be/src/olap/schema_cache.cpp | 10 +-
be/src/olap/schema_cache.h | 10 +-
be/src/olap/segment_loader.cpp | 8 +-
be/src/olap/segment_loader.h | 9 +-
be/src/olap/storage_engine.cpp | 67 +++---
be/src/olap/storage_engine.h | 9 +-
be/src/olap/tablet_schema_cache.cpp | 12 +-
be/src/olap/tablet_schema_cache.h | 22 +-
be/src/pipeline/task_scheduler.cpp | 4 +-
be/src/pipeline/task_scheduler.h | 2 +-
be/src/runtime/broker_mgr.cpp | 2 +-
be/src/runtime/broker_mgr.h | 3 +-
be/src/runtime/exec_env.cpp | 5 +-
be/src/runtime/exec_env.h | 102 +++++++--
be/src/runtime/exec_env_init.cpp | 240 +++++++++++++++++----
be/src/runtime/external_scan_context_mgr.cpp | 2 +-
be/src/runtime/external_scan_context_mgr.h | 4 +-
be/src/runtime/fragment_mgr.cpp | 4 +-
be/src/runtime/fragment_mgr.h | 2 +
be/src/runtime/frontend_info.h | 1 +
be/src/runtime/group_commit_mgr.cpp | 9 +-
be/src/runtime/group_commit_mgr.h | 2 +
be/src/runtime/load_channel_mgr.cpp | 5 +-
be/src/runtime/load_channel_mgr.h | 2 +
be/src/runtime/load_path_mgr.cpp | 2 +-
be/src/runtime/load_path_mgr.h | 3 +-
be/src/runtime/memory/cache_manager.h | 12 +-
be/src/runtime/result_buffer_mgr.cpp | 2 +-
be/src/runtime/result_buffer_mgr.h | 5 +-
be/src/runtime/routine_load/data_consumer_pool.h | 4 +-
.../routine_load/routine_load_task_executor.cpp | 9 +-
.../routine_load/routine_load_task_executor.h | 2 +
be/src/runtime/task_group/task_group_manager.cpp | 3 -
be/src/runtime/task_group/task_group_manager.h | 4 +-
be/src/runtime/user_function_cache.cpp | 4 +-
be/src/service/brpc_service.cpp | 12 +-
be/src/service/doris_main.cpp | 112 +++-------
be/src/service/http_service.cpp | 4 +
be/src/service/http_service.h | 2 +
be/src/service/point_query_executor.cpp | 22 +-
be/src/service/point_query_executor.h | 12 +-
be/src/vec/exec/scan/scanner_scheduler.cpp | 18 +-
be/src/vec/exec/scan/scanner_scheduler.h | 4 +-
be/test/common/resource_tls_test.cpp | 49 -----
be/test/olap/delete_bitmap_calculator_test.cpp | 4 +-
be/test/olap/delete_handler_test.cpp | 7 +-
be/test/olap/delta_writer_test.cpp | 5 +-
.../olap/engine_storage_migration_task_test.cpp | 7 +-
be/test/olap/memtable_flush_executor_test.cpp | 6 +-
be/test/olap/memtable_memory_limiter_test.cpp | 12 +-
be/test/olap/ordered_data_compaction_test.cpp | 5 +-
be/test/olap/rowid_conversion_test.cpp | 4 +-
be/test/olap/rowset/beta_rowset_test.cpp | 9 +-
be/test/olap/rowset/rowset_meta_manager_test.cpp | 3 +
be/test/olap/tablet_cooldown_test.cpp | 6 +-
be/test/olap/tablet_mgr_test.cpp | 3 +
be/test/olap/tablet_test.cpp | 4 +-
be/test/olap/txn_manager_test.cpp | 1 +
be/test/runtime/external_scan_context_mgr_test.cpp | 8 +-
be/test/runtime/load_stream_test.cpp | 8 +-
.../runtime/routine_load_task_executor_test.cpp | 5 +-
be/test/testutil/run_all_tests.cpp | 15 +-
be/test/vec/olap/vertical_compaction_test.cpp | 4 +-
77 files changed, 636 insertions(+), 516 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c17bf1367f..43a3445382 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -63,10 +63,6 @@
namespace doris {
-Daemon::~Daemon() {
- stop();
-}
-
void Daemon::tcmalloc_gc_thread() {
// TODO All cache GC wish to be supported
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) &&
!defined(THREAD_SANITIZER) && \
@@ -389,7 +385,9 @@ void Daemon::start() {
}
void Daemon::stop() {
+ LOG(INFO) << "Doris daemon is stopping.";
if (_stop_background_threads_latch.count() == 0) {
+ LOG(INFO) << "Doris daemon stop returned since no bg threads latch.";
return;
}
_stop_background_threads_latch.count_down();
@@ -398,6 +396,7 @@ void Daemon::stop() {
t->join();
}
}
+ LOG(INFO) << "Doris daemon stopped after background threads are joined.";
}
} // namespace doris
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index c552f55f03..139584ba93 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -28,7 +28,7 @@ namespace doris {
class Daemon {
public:
Daemon() : _stop_background_threads_latch(1) {}
- ~Daemon();
+ ~Daemon() = default;
// Start background threads
void start();
diff --git a/be/src/common/resource_tls.cpp b/be/src/common/resource_tls.cpp
deleted file mode 100644
index 9b5ddc6815..0000000000
--- a/be/src/common/resource_tls.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "common/resource_tls.h"
-
-#include <gen_cpp/Types_types.h>
-#include <pthread.h>
-
-#include <ostream>
-
-#include "common/logging.h"
-
-namespace doris {
-
-static pthread_key_t s_resource_key;
-static bool s_is_init = false;
-
-static void resource_destructor(void* value) {
- TResourceInfo* info = (TResourceInfo*)value;
- if (info != nullptr) {
- delete info;
- }
-}
-
-void ResourceTls::init() {
- int ret = pthread_key_create(&s_resource_key, resource_destructor);
- if (ret != 0) {
- LOG(ERROR) << "create pthread key for resource failed.";
- return;
- }
- s_is_init = true;
-}
-
-TResourceInfo* ResourceTls::get_resource_tls() {
- if (!s_is_init) {
- return nullptr;
- }
- return (TResourceInfo*)pthread_getspecific(s_resource_key);
-}
-
-int ResourceTls::set_resource_tls(TResourceInfo* info) {
- if (!s_is_init) {
- return -1;
- }
- TResourceInfo* old_info =
(TResourceInfo*)pthread_getspecific(s_resource_key);
-
- int ret = pthread_setspecific(s_resource_key, info);
- if (ret == 0) {
- // OK, now we delete old one
- delete old_info;
- }
- return ret;
-}
-
-} // namespace doris
diff --git a/be/src/common/resource_tls.h b/be/src/common/resource_tls.h
deleted file mode 100644
index deed16496d..0000000000
--- a/be/src/common/resource_tls.h
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-namespace doris {
-
-class TResourceInfo;
-class ResourceTls {
-public:
- static void init();
- static TResourceInfo* get_resource_tls();
- static int set_resource_tls(TResourceInfo*);
-};
-
-} // namespace doris
diff --git a/be/src/http/action/file_cache_action.cpp
b/be/src/http/action/file_cache_action.cpp
index db432d5598..c55e639eea 100644
--- a/be/src/http/action/file_cache_action.cpp
+++ b/be/src/http/action/file_cache_action.cpp
@@ -42,9 +42,9 @@ Status FileCacheAction::_handle_header(HttpRequest* req,
std::string* json_metri
if (operation == "release") {
size_t released = 0;
if (req->param("base_path") != "") {
- released =
io::FileCacheFactory::instance().try_release(req->param("base_path"));
+ released =
io::FileCacheFactory::instance()->try_release(req->param("base_path"));
} else {
- released = io::FileCacheFactory::instance().try_release();
+ released = io::FileCacheFactory::instance()->try_release();
}
EasyJson json;
json["released_elements"] = released;
diff --git a/be/src/io/cache/block/block_file_cache_factory.cpp
b/be/src/io/cache/block/block_file_cache_factory.cpp
index 8741ced5c0..6bbbba8023 100644
--- a/be/src/io/cache/block/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block/block_file_cache_factory.cpp
@@ -31,15 +31,15 @@
#include "io/cache/block/block_file_cache_settings.h"
#include "io/cache/block/block_lru_file_cache.h"
#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
namespace doris {
class TUniqueId;
namespace io {
-FileCacheFactory& FileCacheFactory::instance() {
- static FileCacheFactory ret;
- return ret;
+FileCacheFactory* FileCacheFactory::instance() {
+ return ExecEnv::GetInstance()->file_cache_factory();
}
size_t FileCacheFactory::try_release() {
diff --git a/be/src/io/cache/block/block_file_cache_factory.h
b/be/src/io/cache/block/block_file_cache_factory.h
index c8ed2893f1..0b6b6504c9 100644
--- a/be/src/io/cache/block/block_file_cache_factory.h
+++ b/be/src/io/cache/block/block_file_cache_factory.h
@@ -37,7 +37,7 @@ namespace io {
*/
class FileCacheFactory {
public:
- static FileCacheFactory& instance();
+ static FileCacheFactory* instance();
void create_file_cache(const std::string& cache_base_path,
const FileCacheSettings& file_cache_settings,
Status* status);
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index f1662418cf..86ecb14a75 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -47,21 +47,21 @@
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
_is_doris_table = opts.is_doris_table;
if (_is_doris_table) {
_cache_key = IFileCache::hash(path().filename().native());
- _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+ _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
} else {
// Use path and modification time to build cache key
std::string unique_path = fmt::format("{}:{}", path().native(),
opts.mtime);
_cache_key = IFileCache::hash(unique_path);
if (!opts.cache_base_path.empty()) {
// from query session variable: file_cache_base_path
- _cache =
FileCacheFactory::instance().get_by_path(opts.cache_base_path);
+ _cache =
FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
if (_cache == nullptr) {
LOG(WARNING) << "Can't get cache from base path: " <<
opts.cache_base_path
<< ", using random instead.";
- _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+ _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
}
}
- _cache = FileCacheFactory::instance().get_by_path(path().native());
+ _cache = FileCacheFactory::instance()->get_by_path(path().native());
}
}
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h
b/be/src/io/fs/s3_file_write_bufferpool.h
index ad5f698f98..7e8bf01e19 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -28,6 +28,7 @@
#include "common/config.h"
#include "common/status.h"
#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
#include "util/slice.h"
namespace doris {
@@ -126,8 +127,7 @@ public:
doris::ThreadPool* thread_pool);
static S3FileBufferPool* GetInstance() {
- static S3FileBufferPool _pool;
- return &_pool;
+ return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
}
void reclaim(Slice buf) {
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index e0e1d919a5..901e0403f0 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -179,6 +179,20 @@ const std::string REMOTE_TABLET_GC_PREFIX = "tgc_";
} \
} while (0)
+#define SAFE_STOP(ptr) \
+ do { \
+ if (nullptr != ptr) { \
+ ptr->stop(); \
+ } \
+ } while (0)
+
+#define SAFE_SHUTDOWN(ptr) \
+ do { \
+ if (nullptr != ptr) { \
+ ptr->shutdown(); \
+ } \
+ } while (0)
+
#ifndef BUILD_VERSION
#define BUILD_VERSION "Unknown"
#endif
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 57049bdc6d..1779c293f8 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -21,16 +21,16 @@
#include <ostream>
-namespace doris {
-
-StoragePageCache* StoragePageCache::_s_instance = nullptr;
+#include "runtime/exec_env.h"
-void StoragePageCache::create_global_cache(size_t capacity, int32_t
index_cache_percentage,
- int64_t pk_index_cache_capacity,
uint32_t num_shards) {
- DCHECK(_s_instance == nullptr);
- static StoragePageCache instance(capacity, index_cache_percentage,
pk_index_cache_capacity,
- num_shards);
- _s_instance = &instance;
+namespace doris {
+StoragePageCache* StoragePageCache::create_global_cache(size_t capacity,
+ int32_t
index_cache_percentage,
+ int64_t
pk_index_cache_capacity,
+ uint32_t num_shards) {
+ StoragePageCache* res = new StoragePageCache(capacity,
index_cache_percentage,
+ pk_index_cache_capacity,
num_shards);
+ return res;
}
StoragePageCache::StoragePageCache(size_t capacity, int32_t
index_cache_percentage,
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index e1e48f2856..c5c4e99099 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -127,13 +127,13 @@ public:
static constexpr uint32_t kDefaultNumShards = 16;
// Create global instance of this class
- static void create_global_cache(size_t capacity, int32_t
index_cache_percentage,
- int64_t pk_index_cache_capacity,
- uint32_t num_shards = kDefaultNumShards);
+ static StoragePageCache* create_global_cache(size_t capacity, int32_t
index_cache_percentage,
+ int64_t
pk_index_cache_capacity,
+ uint32_t num_shards =
kDefaultNumShards);
// Return global instance.
// Client should call create_global_cache before.
- static StoragePageCache* instance() { return _s_instance; }
+ static StoragePageCache* instance() { return
ExecEnv::GetInstance()->get_storage_page_cache(); }
StoragePageCache(size_t capacity, int32_t index_cache_percentage,
int64_t pk_index_cache_capacity, uint32_t num_shards);
@@ -165,7 +165,6 @@ public:
private:
StoragePageCache();
- static StoragePageCache* _s_instance;
int32_t _index_cache_percentage = 0;
std::unique_ptr<DataPageCache> _data_page_cache = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index 055365cf31..f399f752c3 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -31,6 +31,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
+#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
@@ -38,8 +39,6 @@
namespace doris {
namespace segment_v2 {
-InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr;
-
IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const
io::FileSystemSPtr& fs,
const
std::string& index_dir,
const
std::string& file_name) {
@@ -55,10 +54,10 @@ IndexSearcherPtr
InvertedIndexSearcherCache::build_index_searcher(const io::File
return index_searcher;
}
-void InvertedIndexSearcherCache::create_global_instance(size_t capacity,
uint32_t num_shards) {
- DCHECK(_s_instance == nullptr);
- static InvertedIndexSearcherCache instance(capacity, num_shards);
- _s_instance = &instance;
+InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance(
+ size_t capacity, uint32_t num_shards) {
+ InvertedIndexSearcherCache* res = new InvertedIndexSearcherCache(capacity,
num_shards);
+ return res;
}
InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity,
uint32_t num_shards)
@@ -211,8 +210,6 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const
InvertedIndexSearcherCa
return lru_handle;
}
-InvertedIndexQueryCache* InvertedIndexQueryCache::_s_instance = nullptr;
-
bool InvertedIndexQueryCache::lookup(const CacheKey& key,
InvertedIndexQueryCacheHandle* handle) {
if (key.encode().empty()) {
return false;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
index c67e17ddda..19a9eb7734 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
@@ -37,6 +37,7 @@
#include "io/fs/path.h"
#include "olap/lru_cache.h"
#include "olap/rowset/segment_v2/inverted_index_query_type.h"
+#include "runtime/exec_env.h"
#include "runtime/memory/lru_cache_policy.h"
#include "runtime/memory/mem_tracker.h"
#include "util/slice.h"
@@ -72,7 +73,8 @@ public:
// Create global instance of this class.
// "capacity" is the capacity of lru cache.
- static void create_global_instance(size_t capacity, uint32_t num_shards =
16);
+ static InvertedIndexSearcherCache* create_global_instance(size_t capacity,
+ uint32_t
num_shards = 16);
void reset() {
_cache.reset();
@@ -80,15 +82,11 @@ public:
// Reset or clear the state of the object.
}
- static void reset_global_instance() {
- if (_s_instance != nullptr) {
- _s_instance->reset();
- }
- }
-
// Return global instance.
// Client should call create_global_cache before.
- static InvertedIndexSearcherCache* instance() { return _s_instance; }
+ static InvertedIndexSearcherCache* instance() {
+ return ExecEnv::GetInstance()->get_inverted_index_searcher_cache();
+ }
static IndexSearcherPtr build_index_searcher(const io::FileSystemSPtr& fs,
const std::string& index_dir,
@@ -123,7 +121,6 @@ private:
Cache::Handle* _insert(const InvertedIndexSearcherCache::CacheKey& key,
CacheValue* value);
private:
- static InvertedIndexSearcherCache* _s_instance;
std::unique_ptr<MemTracker> _mem_tracker = nullptr;
};
@@ -223,15 +220,16 @@ public:
};
// Create global instance of this class
- static void create_global_cache(size_t capacity, uint32_t num_shards = 16)
{
- DCHECK(_s_instance == nullptr);
- static InvertedIndexQueryCache instance(capacity, num_shards);
- _s_instance = &instance;
+ static InvertedIndexQueryCache* create_global_cache(size_t capacity,
uint32_t num_shards = 16) {
+ InvertedIndexQueryCache* res = new InvertedIndexQueryCache(capacity,
num_shards);
+ return res;
}
// Return global instance.
// Client should call create_global_cache before.
- static InvertedIndexQueryCache* instance() { return _s_instance; }
+ static InvertedIndexQueryCache* instance() {
+ return ExecEnv::GetInstance()->get_inverted_index_query_cache();
+ }
InvertedIndexQueryCache() = delete;
@@ -246,9 +244,6 @@ public:
InvertedIndexQueryCacheHandle* handle);
int64_t mem_consumption();
-
-private:
- static InvertedIndexQueryCache* _s_instance;
};
class InvertedIndexQueryCacheHandle {
diff --git a/be/src/olap/schema_cache.cpp b/be/src/olap/schema_cache.cpp
index 39d1a60a4c..7bf6b592c6 100644
--- a/be/src/olap/schema_cache.cpp
+++ b/be/src/olap/schema_cache.cpp
@@ -35,7 +35,9 @@
namespace doris {
-SchemaCache* SchemaCache::_s_instance = nullptr;
+SchemaCache* SchemaCache::instance() {
+ return ExecEnv::GetInstance()->schema_cache();
+}
// format: tabletId-unique_id1-uniqueid2...-version-type
std::string SchemaCache::get_schema_key(int32_t tablet_id, const
TabletSchemaSPtr& schema,
@@ -69,10 +71,4 @@ std::string SchemaCache::get_schema_key(int32_t tablet_id,
const std::vector<TCo
return key;
}
-void SchemaCache::create_global_instance(size_t capacity) {
- DCHECK(_s_instance == nullptr);
- static SchemaCache instance(capacity);
- _s_instance = &instance;
-}
-
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h
index 5d94c92837..dbce5336d8 100644
--- a/be/src/olap/schema_cache.h
+++ b/be/src/olap/schema_cache.h
@@ -48,7 +48,7 @@ class SchemaCache : public LRUCachePolicy {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };
- static SchemaCache* instance() { return _s_instance; }
+ static SchemaCache* instance();
static void create_global_instance(size_t capacity);
@@ -62,7 +62,7 @@ public:
// Get a shared cached schema from cache, schema_key is a subset of column
unique ids
template <typename SchemaType>
SchemaType get_schema(const std::string& schema_key) {
- if (!_s_instance || schema_key.empty()) {
+ if (!instance() || schema_key.empty()) {
return {};
}
auto lru_handle = _cache->lookup(schema_key);
@@ -84,7 +84,7 @@ public:
// Insert a shared Schema into cache, schema_key is full column unique ids
template <typename SchemaType>
void insert_schema(const std::string& key, SchemaType schema) {
- if (!_s_instance || key.empty()) {
+ if (!instance() || key.empty()) {
return;
}
CacheValue* value = new CacheValue;
@@ -115,12 +115,12 @@ public:
SchemaSPtr schema = nullptr;
};
-private:
SchemaCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CACHE, capacity,
LRUCacheType::NUMBER,
config::schema_cache_sweep_time_sec) {}
+
+private:
static constexpr char SCHEMA_DELIMITER = '-';
- static SchemaCache* _s_instance;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 617b5e6fff..8d759cce8e 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -24,12 +24,8 @@
namespace doris {
-SegmentLoader* SegmentLoader::_s_instance = nullptr;
-
-void SegmentLoader::create_global_instance(size_t capacity) {
- DCHECK(_s_instance == nullptr);
- static SegmentLoader instance(capacity);
- _s_instance = &instance;
+SegmentLoader* SegmentLoader::instance() {
+ return ExecEnv::GetInstance()->segment_loader();
}
bool SegmentCache::lookup(const SegmentCache::CacheKey& key,
SegmentCacheHandle* handle) {
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index d5849e6097..d6b1d07940 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -95,6 +95,8 @@ public:
class SegmentLoader {
public:
+ static SegmentLoader* instance();
+
// Create global instance of this class.
// "capacity" is the capacity of lru cache.
// TODO: Currently we use the number of rowset as the cache capacity.
@@ -102,11 +104,6 @@ public:
// This is because currently we cannot accurately estimate the memory
occupied by a segment.
// After the estimation of segment memory usage is provided later, it is
recommended
// to use Memory as the capacity limit of the cache.
- static void create_global_instance(size_t capacity);
-
- // Return global instance.
- // Client should call create_global_cache before.
- static SegmentLoader* instance() { return _s_instance; }
SegmentLoader(size_t capacity) { _segment_cache =
std::make_unique<SegmentCache>(capacity); }
@@ -121,8 +118,6 @@ public:
private:
SegmentLoader();
-
- static SegmentLoader* _s_instance;
std::unique_ptr<SegmentCache> _segment_cache = nullptr;
};
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index b425076fc0..8bace65617 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -93,8 +93,6 @@ using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
-StorageEngine* StorageEngine::_s_instance = nullptr;
-
static Status _validate_options(const EngineOptions& options) {
if (options.store_paths.empty()) {
return Status::InternalError("store paths is empty");
@@ -102,14 +100,11 @@ static Status _validate_options(const EngineOptions&
options) {
return Status::OK();
}
-Status StorageEngine::open(const EngineOptions& options,
- std::unique_ptr<StorageEngine>* engine_ptr) {
- RETURN_IF_ERROR(_validate_options(options));
- LOG(INFO) << "starting backend using uid:" <<
options.backend_uid.to_string();
- std::unique_ptr<StorageEngine> engine(new StorageEngine(options));
- RETURN_NOT_OK_STATUS_WITH_WARN(engine->_open(), "open engine failed");
+Status StorageEngine::open() {
+ RETURN_IF_ERROR(_validate_options(_options));
+ LOG(INFO) << "starting backend using uid:" <<
_options.backend_uid.to_string();
+ RETURN_NOT_OK_STATUS_WITH_WARN(_open(), "open engine failed");
LOG(INFO) << "success to init storage engine.";
- *engine_ptr = std::move(engine);
return Status::OK();
}
@@ -130,7 +125,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_default_rowset_type(BETA_ROWSET),
_heartbeat_flags(nullptr),
_stream_load_recorder(nullptr) {
- _s_instance = this;
REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() {
// std::lock_guard<std::mutex> lock(_gc_mutex);
return _unused_rowsets.size();
@@ -139,30 +133,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
StorageEngine::~StorageEngine() {
stop();
-
- DEREGISTER_HOOK_METRIC(unused_rowsets_count);
-
- if (_base_compaction_thread_pool) {
- _base_compaction_thread_pool->shutdown();
- }
- if (_cumu_compaction_thread_pool) {
- _cumu_compaction_thread_pool->shutdown();
- }
- if (_single_replica_compaction_thread_pool) {
- _single_replica_compaction_thread_pool->shutdown();
- }
-
- if (_seg_compaction_thread_pool) {
- _seg_compaction_thread_pool->shutdown();
- }
- if (_tablet_meta_checkpoint_thread_pool) {
- _tablet_meta_checkpoint_thread_pool->shutdown();
- }
- if (_cold_data_compaction_thread_pool) {
- _cold_data_compaction_thread_pool->shutdown();
- }
_clear();
- _s_instance = nullptr;
}
Status StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
@@ -543,7 +514,10 @@ void StorageEngine::_exit_if_too_many_disks_are_failed() {
}
void StorageEngine::stop() {
- if (_stopped) return;
+ if (_stopped) {
+ LOG(WARNING) << "Storage engine is stopped twice.";
+ return;
+ }
// trigger the waiting threads
notify_listeners();
@@ -582,7 +556,32 @@ void StorageEngine::stop() {
THREADS_JOIN(_path_gc_threads);
THREADS_JOIN(_path_scan_threads);
#undef THREADS_JOIN
+
+ if (_base_compaction_thread_pool) {
+ _base_compaction_thread_pool->shutdown();
+ }
+ if (_cumu_compaction_thread_pool) {
+ _cumu_compaction_thread_pool->shutdown();
+ }
+ if (_single_replica_compaction_thread_pool) {
+ _single_replica_compaction_thread_pool->shutdown();
+ }
+
+ if (_seg_compaction_thread_pool) {
+ _seg_compaction_thread_pool->shutdown();
+ }
+ if (_tablet_meta_checkpoint_thread_pool) {
+ _tablet_meta_checkpoint_thread_pool->shutdown();
+ }
+ if (_cold_data_compaction_thread_pool) {
+ _cold_data_compaction_thread_pool->shutdown();
+ }
+
+ _memtable_flush_executor.reset(nullptr);
+ _calc_delete_bitmap_executor.reset(nullptr);
+
_stopped = true;
+ LOG(INFO) << "Storage engine is stopped.";
}
void StorageEngine::_clear() {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 02e6ec5f7b..b5cb99ec21 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -47,6 +47,7 @@
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet.h"
#include "olap/task/index_builder.h"
+#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"
#include "util/countdown_latch.h"
@@ -81,9 +82,9 @@ public:
StorageEngine(const EngineOptions& options);
~StorageEngine();
- static Status open(const EngineOptions& options,
std::unique_ptr<StorageEngine>* engine_ptr);
+ [[nodiscard]] Status open();
- static StorageEngine* instance() { return _s_instance; }
+ static StorageEngine* instance() { return
ExecEnv::GetInstance()->get_storage_engine(); }
Status create_tablet(const TCreateTabletReq& request, RuntimeProfile*
profile);
@@ -178,6 +179,7 @@ public:
// option: update disk usage after sweep
Status start_trash_sweep(double* usage, bool ignore_guard = false);
+ // Must call stop() before storage_engine is deconstructed
void stop();
void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
@@ -373,8 +375,7 @@ private:
int32_t _effective_cluster_id;
bool _is_all_cluster_id_exist;
- static StorageEngine* _s_instance;
- bool _stopped;
+ std::atomic_bool _stopped {false};
std::mutex _gc_mutex;
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we
need custom hash func
diff --git a/be/src/olap/tablet_schema_cache.cpp
b/be/src/olap/tablet_schema_cache.cpp
index 2e4b221492..90880665ad 100644
--- a/be/src/olap/tablet_schema_cache.cpp
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -19,12 +19,7 @@
namespace doris {
-TabletSchemaCache::~TabletSchemaCache() {
- stop_and_join();
-}
-
TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) {
- DCHECK(_s_instance != nullptr);
std::lock_guard guard(_mtx);
auto iter = _cache.find(key);
if (iter == _cache.end()) {
@@ -41,11 +36,18 @@ TabletSchemaSPtr TabletSchemaCache::insert(const
std::string& key) {
return iter->second;
}
+void TabletSchemaCache::start() {
+ std::thread t(&TabletSchemaCache::_recycle, this);
+ t.detach();
+ LOG(INFO) << "TabletSchemaCache started";
+}
+
void TabletSchemaCache::stop() {
_should_stop = true;
while (!_is_stopped) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
+ LOG(INFO) << "TabletSchemaCache stopped";
}
/**
diff --git a/be/src/olap/tablet_schema_cache.h
b/be/src/olap/tablet_schema_cache.h
index 6c692aaf46..93798983c8 100644
--- a/be/src/olap/tablet_schema_cache.h
+++ b/be/src/olap/tablet_schema_cache.h
@@ -24,31 +24,28 @@
#include <unordered_map>
#include "olap/tablet_schema.h"
+#include "runtime/exec_env.h"
#include "util/doris_metrics.h"
namespace doris {
class TabletSchemaCache {
public:
- ~TabletSchemaCache();
+ ~TabletSchemaCache() = default;
- static void create_global_schema_cache() {
- DCHECK(_s_instance == nullptr);
- static TabletSchemaCache instance;
- _s_instance = &instance;
- std::thread t(&TabletSchemaCache::_recycle, _s_instance);
- t.detach();
+ static TabletSchemaCache* create_global_schema_cache() {
+ TabletSchemaCache* res = new TabletSchemaCache();
+ return res;
}
- static TabletSchemaCache* instance() { return _s_instance; }
-
- static void stop_and_join() {
- DCHECK(_s_instance != nullptr);
- _s_instance->stop();
+ static TabletSchemaCache* instance() {
+ return ExecEnv::GetInstance()->get_tablet_schema_cache();
}
TabletSchemaSPtr insert(const std::string& key);
+ void start();
+
void stop();
private:
@@ -58,7 +55,6 @@ private:
void _recycle();
private:
- static inline TabletSchemaCache* _s_instance = nullptr;
std::mutex _mtx;
std::unordered_map<std::string, TabletSchemaSPtr> _cache;
std::atomic_bool _should_stop = {false};
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 0ce4115c0c..c4278c3807 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -189,7 +189,7 @@ void
BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
}
TaskScheduler::~TaskScheduler() {
- shutdown();
+ stop();
}
Status TaskScheduler::start() {
@@ -340,7 +340,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state)
task->fragment_context()->close_a_pipeline();
}
-void TaskScheduler::shutdown() {
+void TaskScheduler::stop() {
if (!this->_shutdown.load()) {
this->_shutdown.store(true);
_blocked_task_scheduler->shutdown();
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index ac9389c088..13b9e734d6 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -89,7 +89,7 @@ public:
Status start();
- void shutdown();
+ void stop();
TaskQueue* task_queue() const { return _task_queue.get(); }
diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp
index 6d17f32ffb..613b916d06 100644
--- a/be/src/runtime/broker_mgr.cpp
+++ b/be/src/runtime/broker_mgr.cpp
@@ -53,7 +53,7 @@ BrokerMgr::BrokerMgr(ExecEnv* exec_env) :
_exec_env(exec_env), _stop_background_
});
}
-BrokerMgr::~BrokerMgr() {
+void BrokerMgr::stop() {
DEREGISTER_HOOK_METRIC(broker_count);
_stop_background_threads_latch.count_down();
if (_ping_thread) {
diff --git a/be/src/runtime/broker_mgr.h b/be/src/runtime/broker_mgr.h
index fd7a67e43c..d9238aed2c 100644
--- a/be/src/runtime/broker_mgr.h
+++ b/be/src/runtime/broker_mgr.h
@@ -35,8 +35,9 @@ class Thread;
class BrokerMgr {
public:
BrokerMgr(ExecEnv* exec_env);
- ~BrokerMgr();
+ ~BrokerMgr() = default;
void init();
+ void stop();
const std::string& get_client_id(const TNetworkAddress& address);
private:
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index c30fb20db8..27986f5de3 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -23,6 +23,7 @@
#include <utility>
#include "common/config.h"
+#include "olap/olap_define.h"
#include "runtime/fragment_mgr.h"
#include "runtime/frontend_info.h"
#include "time.h"
@@ -31,10 +32,8 @@
namespace doris {
-ExecEnv::ExecEnv() = default;
-
ExecEnv::~ExecEnv() {
- _destroy();
+ destroy();
}
const std::string& ExecEnv::token() const {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f6c61d26f3..fd54d44166 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -17,7 +17,7 @@
#pragma once
-#include <gen_cpp/HeartbeatService_types.h>
+#include <common/multi_version.h>
#include <stddef.h>
#include <algorithm>
@@ -32,12 +32,13 @@
#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
+#include "olap/olap_define.h"
#include "olap/options.h"
+#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove
this include header
#include "util/threadpool.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
namespace doris {
-struct FrontendInfo;
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
@@ -49,6 +50,15 @@ class TaskScheduler;
namespace taskgroup {
class TaskGroupManager;
}
+namespace io {
+class S3FileBufferPool;
+class FileCacheFactory;
+} // namespace io
+namespace segment_v2 {
+class InvertedIndexSearcherCache;
+class InvertedIndexQueryCache;
+} // namespace segment_v2
+
class BfdParser;
class BrokerMgr;
template <class T>
@@ -80,6 +90,14 @@ class HeartbeatFlags;
class FrontendServiceClient;
class FileMetaCache;
class GroupCommitMgr;
+class TabletSchemaCache;
+class UserFunctionCache;
+class SchemaCache;
+class StoragePageCache;
+class SegmentLoader;
+class LookupConnectionCache;
+class RowCache;
+class CacheManager;
inline bool k_doris_exit = false;
@@ -89,9 +107,15 @@ inline bool k_doris_exit = false;
// once to properly initialise service state.
class ExecEnv {
public:
+ // Empty destructor because the compiler-generated one requires full
+ // declarations for classes in scoped_ptrs.
+ ~ExecEnv();
+
// Initial exec environment. must call this to init all
- static Status init(ExecEnv* env, const std::vector<StorePath>&
store_paths);
- static void destroy(ExecEnv* exec_env);
+ [[nodiscard]] static Status init(ExecEnv* env, const
std::vector<StorePath>& store_paths);
+
+ // Stop all threads and delete resources.
+ void destroy();
/// Returns the first created exec env instance. In a normal doris, this is
/// the only instance. In test setups with multiple ExecEnv's per process,
@@ -101,10 +125,6 @@ public:
return &s_exec_env;
}
- // Empty destructor because the compiler-generated one requires full
- // declarations for classes in scoped_ptrs.
- ~ExecEnv();
-
static bool ready() { return _s_ready.load(std::memory_order_acquire); }
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return
_external_scan_context_mgr; }
@@ -152,12 +172,15 @@ public:
void init_download_cache_buf();
void init_download_cache_required_components();
Status init_pipeline_task_scheduler();
+ void init_file_cache_factory();
char* get_download_cache_buf(ThreadPoolToken* token) {
if (_download_cache_buf_map.find(token) ==
_download_cache_buf_map.end()) {
return nullptr;
}
return _download_cache_buf_map[token].get();
}
+ io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
+ UserFunctionCache* user_function_cache() { return _user_function_cache; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
TMasterInfo* master_info() { return _master_info; }
@@ -185,14 +208,11 @@ public:
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
MemTableMemoryLimiter* memtable_memory_limiter() { return
_memtable_memory_limiter.get(); }
#ifdef BE_TEST
+ void set_ready() { this->_s_ready = true; }
+ void set_not_ready() { this->_s_ready = false; }
void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
_memtable_memory_limiter.reset(limiter);
}
-#endif
- vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
- std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
-
- // only for unit test
void set_master_info(TMasterInfo* master_info) { this->_master_info =
master_info; }
void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr>
new_load_stream_mgr) {
this->_new_load_stream_mgr = new_load_stream_mgr;
@@ -201,16 +221,45 @@ public:
this->_stream_load_executor = stream_load_executor;
}
+ void set_storage_engine(StorageEngine* se) { this->_storage_engine = se; }
+ void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; }
+ void set_tablet_schema_cache(TabletSchemaCache* c) {
this->_tablet_schema_cache = c; }
+ void set_storage_page_cache(StoragePageCache* c) {
this->_storage_page_cache = c; }
+ void set_segment_loader(SegmentLoader* sl) { this->_segment_loader = sl; }
+ void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
+ this->_routine_load_task_executor = r;
+ }
+
+#endif
+ vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
+ std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
+
void wait_for_all_tasks_done();
void update_frontends(const std::vector<TFrontendInfo>& new_infos);
std::map<TNetworkAddress, FrontendInfo> get_frontends();
std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
+ TabletSchemaCache* get_tablet_schema_cache() { return
_tablet_schema_cache; }
+ StorageEngine* get_storage_engine() { return _storage_engine; }
+ io::S3FileBufferPool* get_s3_file_buffer_pool() { return _s3_buffer_pool; }
+ SchemaCache* schema_cache() { return _schema_cache; }
+ StoragePageCache* get_storage_page_cache() { return _storage_page_cache; }
+ SegmentLoader* segment_loader() { return _segment_loader; }
+ LookupConnectionCache* get_lookup_connection_cache() { return
_lookup_connection_cache; }
+ RowCache* get_row_cache() { return _row_cache; }
+ CacheManager* get_cache_manager() { return _cache_manager; }
+ segment_v2::InvertedIndexSearcherCache*
get_inverted_index_searcher_cache() {
+ return _inverted_index_searcher_cache;
+ }
+ segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() {
+ return _inverted_index_query_cache;
+ }
+
private:
- ExecEnv();
+ ExecEnv() = default;
- Status _init(const std::vector<StorePath>& store_paths);
+ [[nodiscard]] Status _init(const std::vector<StorePath>& store_paths);
void _destroy();
Status _init_mem_env();
@@ -221,6 +270,8 @@ private:
inline static std::atomic_bool _s_ready {false};
std::vector<StorePath> _store_paths;
+ io::FileCacheFactory* _file_cache_factory = nullptr;
+ UserFunctionCache* _user_function_cache = nullptr;
// Leave protected so that subclasses can override
ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
@@ -268,6 +319,7 @@ private:
BfdParser* _bfd_parser = nullptr;
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
+ // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control
its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
@@ -289,6 +341,22 @@ private:
std::mutex _frontends_lock;
std::map<TNetworkAddress, FrontendInfo> _frontends;
GroupCommitMgr* _group_commit_mgr = nullptr;
+
+ // Maybe we should use unique_ptr, but it need complete type, which means
we need
+ // to include many headers, and for some cpp file that do not need class
like TabletSchemaCache,
+ // these redundancy header could introduce potential bug, at least, more
header means slow compile.
+ // So we choose to use raw pointer, please remember to delete these
pointer in deconstructor.
+ TabletSchemaCache* _tablet_schema_cache = nullptr;
+ io::S3FileBufferPool* _s3_buffer_pool = nullptr;
+ StorageEngine* _storage_engine = nullptr;
+ SchemaCache* _schema_cache = nullptr;
+ StoragePageCache* _storage_page_cache = nullptr;
+ SegmentLoader* _segment_loader = nullptr;
+ LookupConnectionCache* _lookup_connection_cache = nullptr;
+ RowCache* _row_cache = nullptr;
+ CacheManager* _cache_manager = nullptr;
+ segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache =
nullptr;
+ segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
};
template <>
@@ -305,4 +373,8 @@ ExecEnv::get_client_cache<TPaloBrokerServiceClient>() {
return _broker_client_cache;
}
+inline segment_v2::InvertedIndexQueryCache* GetInvertedIndexQueryCache() {
+ return ExecEnv::GetInstance()->get_inverted_index_query_cache();
+}
+
} // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 0bc375aac4..bcdbf49801 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -16,6 +16,7 @@
// under the License.
// IWYU pragma: no_include <bthread/errno.h>
+#include <common/multi_version.h>
#include <errno.h> // IWYU pragma: keep
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/Metrics_types.h>
@@ -36,7 +37,9 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "io/cache/block/block_file_cache_factory.h"
#include "io/fs/file_meta_cache.h"
+#include "io/fs/s3_file_write_bufferpool.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
@@ -44,6 +47,7 @@
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/schema_cache.h"
#include "olap/segment_loader.h"
+#include "olap/storage_engine.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/block_spill_manager.h"
@@ -69,7 +73,9 @@
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/thread_context.h"
+#include "runtime/user_function_cache.h"
#include "service/backend_options.h"
+#include "service/backend_service.h"
#include "service/point_query_executor.h"
#include "util/bfd_parser.h"
#include "util/bit_util.h"
@@ -82,6 +88,7 @@
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
+#include "util/thrift_rpc_helper.h"
#include "util/timezone_utils.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -135,7 +142,8 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
}
init_doris_metrics(store_paths);
_store_paths = store_paths;
-
+ _user_function_cache = new UserFunctionCache();
+ _user_function_cache->init(doris::config::user_function_dir);
_external_scan_context_mgr = new ExternalScanContextMgr(this);
_vstream_mgr = new doris::vectorized::VDataStreamMgr();
_result_mgr = new ResultBufferMgr();
@@ -146,6 +154,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
TimezoneUtils::load_timezone_names();
+ // _global_zone_cache is not owned by ExecEnv ... maybe should refactor.
_global_zone_cache = std::make_unique<vectorized::ZoneList>();
TimezoneUtils::load_timezones_to_cache(*_global_zone_cache);
@@ -176,7 +185,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
.set_max_threads(std::numeric_limits<int>::max())
.set_max_queue_size(config::fragment_pool_queue_size)
.build(&_join_node_thread_pool);
-
+ init_file_cache_factory();
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_task_group_manager = new taskgroup::TaskGroupManager();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
@@ -205,12 +214,16 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_result_mgr->init();
Status status = _load_path_mgr->init();
if (!status.ok()) {
- LOG(ERROR) << "load path mgr init failed." << status;
- exit(-1);
+ LOG(ERROR) << "Load path mgr init failed. " << status;
+ return status;
}
_broker_mgr->init();
_small_file_mgr->init();
- _scanner_scheduler->init(this);
+ status = _scanner_scheduler->init();
+ if (!status.ok()) {
+ LOG(ERROR) << "Scanner scheduler init failed. " << status;
+ return status;
+ }
_init_mem_env();
@@ -218,7 +231,33 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
_heartbeat_flags = new HeartbeatFlags();
_register_metrics();
+
+ _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache();
+ _tablet_schema_cache->start();
+
+ // S3 buffer pool
+ _s3_buffer_pool = new io::S3FileBufferPool();
+ _s3_buffer_pool->init(config::s3_write_buffer_whole_size,
config::s3_write_buffer_size,
+ this->buffered_reader_prefetch_thread_pool());
+
+ // Storage engine
+ doris::EngineOptions options;
+ options.store_paths = store_paths;
+ options.backend_uid = doris::UniqueId::gen_uid();
+ _storage_engine = new StorageEngine(options);
+ auto st = _storage_engine->open();
+ if (!st.ok()) {
+ LOG(ERROR) << "Lail to open StorageEngine, res=" << st;
+ return st;
+ }
+ _storage_engine->set_heartbeat_flags(this->heartbeat_flags());
+ if (st = _storage_engine->start_bg_threads(); !st.ok()) {
+ LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" <<
st;
+ return st;
+ }
+
_s_ready = true;
+
return Status::OK();
}
@@ -244,6 +283,56 @@ Status ExecEnv::init_pipeline_task_scheduler() {
return Status::OK();
}
+void ExecEnv::init_file_cache_factory() {
+ // Load file cache before starting up daemon threads to make sure
StorageEngine is read.
+ if (doris::config::enable_file_cache) {
+ _file_cache_factory = new io::FileCacheFactory();
+ io::IFileCache::init();
+ std::unordered_set<std::string> cache_path_set;
+ std::vector<doris::CachePath> cache_paths;
+ Status olap_res =
+ doris::parse_conf_cache_paths(doris::config::file_cache_path,
cache_paths);
+ if (!olap_res) {
+ LOG(FATAL) << "parse config file cache path failed, path="
+ << doris::config::file_cache_path;
+ exit(-1);
+ }
+
+ std::unique_ptr<doris::ThreadPool> file_cache_init_pool;
+ doris::ThreadPoolBuilder("FileCacheInitThreadPool")
+ .set_min_threads(cache_paths.size())
+ .set_max_threads(cache_paths.size())
+ .build(&file_cache_init_pool);
+
+ std::list<doris::Status> cache_status;
+ for (auto& cache_path : cache_paths) {
+ if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
+ LOG(WARNING) << fmt::format("cache path {} is duplicate",
cache_path.path);
+ continue;
+ }
+
+ olap_res = file_cache_init_pool->submit_func(std::bind(
+ &io::FileCacheFactory::create_file_cache,
_file_cache_factory, cache_path.path,
+ cache_path.init_settings(),
&(cache_status.emplace_back())));
+
+ if (!olap_res.ok()) {
+ LOG(FATAL) << "failed to init file cache, err: " << olap_res;
+ exit(-1);
+ }
+ cache_path_set.emplace(cache_path.path);
+ }
+
+ file_cache_init_pool->wait();
+ for (const auto& status : cache_status) {
+ if (!status.ok()) {
+ LOG(FATAL) << "failed to init file cache, err: " << status;
+ exit(-1);
+ }
+ }
+ }
+ return;
+}
+
Status ExecEnv::_init_mem_env() {
bool is_percent = false;
std::stringstream ss;
@@ -262,7 +351,7 @@ Status ExecEnv::_init_mem_env() {
}
// 3. init storage page cache
- CacheManager::create_global_instance();
+ _cache_manager = CacheManager::create_global_instance();
int64_t storage_cache_limit =
ParseUtil::parse_mem_spec(config::storage_page_cache_limit,
MemInfo::mem_limit(),
@@ -286,8 +375,8 @@ Status ExecEnv::_init_mem_env() {
while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() /
2) {
pk_storage_page_cache_limit = storage_cache_limit / 2;
}
- StoragePageCache::create_global_cache(storage_cache_limit,
index_percentage,
- pk_storage_page_cache_limit,
num_shards);
+ _storage_page_cache = StoragePageCache::create_global_cache(
+ storage_cache_limit, index_percentage,
pk_storage_page_cache_limit, num_shards);
LOG(INFO) << "Storage page cache memory limit: "
<< PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::storage_page_cache_limit;
@@ -300,7 +389,7 @@ Status ExecEnv::_init_mem_env() {
// Reason same as buffer_pool_limit
row_cache_mem_limit = row_cache_mem_limit / 2;
}
- RowCache::create_global_cache(row_cache_mem_limit);
+ _row_cache = RowCache::create_global_cache(row_cache_mem_limit);
LOG(INFO) << "Row cache memory limit: "
<< PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES)
<< ", origin config value: " << config::row_cache_mem_limit;
@@ -322,11 +411,12 @@ Status ExecEnv::_init_mem_env() {
}
LOG(INFO) << "segment_cache_capacity <= fd_number * 2 / 5, fd_number: " <<
fd_number
<< " segment_cache_capacity: " << segment_cache_capacity;
- SegmentLoader::create_global_instance(segment_cache_capacity);
+ _segment_loader = new SegmentLoader(segment_cache_capacity);
- SchemaCache::create_global_instance(config::schema_cache_capacity);
+ _schema_cache = new SchemaCache(config::schema_cache_capacity);
-
LookupConnectionCache::create_global_instance(config::lookup_connection_cache_bytes_limit);
+ _lookup_connection_cache = LookupConnectionCache::create_global_instance(
+ config::lookup_connection_cache_bytes_limit);
// use memory limit
int64_t inverted_index_cache_limit =
@@ -336,7 +426,8 @@ Status ExecEnv::_init_mem_env() {
// Reason same as buffer_pool_limit
inverted_index_cache_limit = inverted_index_cache_limit / 2;
}
-
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit);
+ _inverted_index_searcher_cache =
+
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit);
LOG(INFO) << "Inverted index searcher cache memory limit: "
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " <<
config::inverted_index_searcher_cache_limit;
@@ -349,7 +440,8 @@ Status ExecEnv::_init_mem_env() {
// Reason same as buffer_pool_limit
inverted_index_query_cache_limit = inverted_index_query_cache_limit /
2;
}
-
InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit);
+ _inverted_index_query_cache =
+
InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit);
LOG(INFO) << "Inverted index query match cache memory limit: "
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " <<
config::inverted_index_query_cache_limit;
@@ -410,58 +502,120 @@ void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size);
}
-void ExecEnv::_destroy() {
+// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a
Stop method.
+// We need to stop all threads before releasing resource.
+void ExecEnv::destroy() {
//Only destroy once after init
if (!ready()) {
return;
}
// Memory barrier to prevent other threads from accessing destructed
resources
_s_ready = false;
+
+ SAFE_STOP(_tablet_schema_cache);
+ SAFE_STOP(_load_channel_mgr);
+ SAFE_STOP(_scanner_scheduler);
+ SAFE_STOP(_broker_mgr);
+ SAFE_STOP(_load_path_mgr);
+ SAFE_STOP(_result_mgr);
+ SAFE_STOP(_group_commit_mgr);
+ // _routine_load_task_executor should be stopped before
_new_load_stream_mgr.
+ SAFE_STOP(_routine_load_task_executor);
+ SAFE_STOP(_pipeline_task_scheduler);
+ SAFE_STOP(_pipeline_task_group_scheduler);
+ SAFE_STOP(_external_scan_context_mgr);
+ SAFE_STOP(_fragment_mgr);
+ // NewLoadStreamMgr should be destoried before storage_engine & after
fragment_mgr stopped.
+ _new_load_stream_mgr.reset();
+ _stream_load_executor.reset();
+ SAFE_STOP(_storage_engine);
+ SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
+ SAFE_SHUTDOWN(_join_node_thread_pool);
+ SAFE_SHUTDOWN(_send_report_thread_pool);
+ SAFE_SHUTDOWN(_send_batch_thread_pool);
+ SAFE_SHUTDOWN(_serial_download_cache_thread_token);
+ SAFE_SHUTDOWN(_download_cache_thread_pool);
+
+ // Free resource after threads are stopped.
+ // Some threads are still running, like threads created by
_new_load_stream_mgr ...
+ SAFE_DELETE(_s3_buffer_pool);
+ SAFE_DELETE(_tablet_schema_cache);
_deregister_metrics();
- SAFE_DELETE(_internal_client_cache);
- SAFE_DELETE(_function_client_cache);
SAFE_DELETE(_load_channel_mgr);
+ _memtable_memory_limiter.reset(nullptr);
+
+ // shared_ptr maybe no need to be reset
+ // _brpc_iobuf_block_memory_tracker.reset();
+ // _page_no_cache_mem_tracker.reset();
+ // _experimental_mem_tracker.reset();
+ // _orphan_mem_tracker.reset();
+
+ SAFE_DELETE(_block_spill_mgr);
+ SAFE_DELETE(_inverted_index_query_cache);
+ SAFE_DELETE(_inverted_index_searcher_cache);
+ SAFE_DELETE(_lookup_connection_cache);
+ SAFE_DELETE(_schema_cache);
+ SAFE_DELETE(_segment_loader);
+ SAFE_DELETE(_row_cache);
+
+ // StorageEngine must be destoried before _page_no_cache_mem_tracker.reset
+ // StorageEngine must be destoried before _cache_manager destory
+ SAFE_DELETE(_storage_engine);
+
+ // _scanner_scheduler must be desotried before _storage_page_cache
+ SAFE_DELETE(_scanner_scheduler);
+ // _storage_page_cache must be destoried before _cache_manager
+ SAFE_DELETE(_storage_page_cache);
+ // cache_manager must be destoried after _inverted_index_query_cache
+ // https://github.com/apache/doris/issues/24082#issuecomment-1712544039
+ SAFE_DELETE(_cache_manager);
+
+ SAFE_DELETE(_small_file_mgr);
SAFE_DELETE(_broker_mgr);
- SAFE_DELETE(_bfd_parser);
SAFE_DELETE(_load_path_mgr);
- SAFE_DELETE(_pipeline_task_scheduler);
- SAFE_DELETE(_pipeline_task_group_scheduler);
- SAFE_DELETE(_task_group_manager);
+ SAFE_DELETE(_result_mgr);
+ SAFE_DELETE(_file_meta_cache);
+ SAFE_DELETE(_group_commit_mgr);
+ SAFE_DELETE(_routine_load_task_executor);
+ // _stream_load_executor
+ SAFE_DELETE(_function_client_cache);
+ SAFE_DELETE(_internal_client_cache);
+
+ SAFE_DELETE(_bfd_parser);
+ SAFE_DELETE(_result_cache);
SAFE_DELETE(_fragment_mgr);
+ SAFE_DELETE(_task_group_manager);
+ SAFE_DELETE(_pipeline_task_group_scheduler);
+ SAFE_DELETE(_pipeline_task_scheduler);
+ SAFE_DELETE(_file_cache_factory);
+ // TODO(zhiqiang): Maybe we should call shutdown before release thread
pool?
+ _join_node_thread_pool.reset(nullptr);
+ _send_report_thread_pool.reset(nullptr);
+ _buffered_reader_prefetch_thread_pool.reset(nullptr);
+ _send_batch_thread_pool.reset(nullptr);
+
SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
SAFE_DELETE(_backend_client_cache);
- SAFE_DELETE(_result_mgr);
SAFE_DELETE(_result_queue_mgr);
- SAFE_DELETE(_routine_load_task_executor);
+
+ SAFE_DELETE(_vstream_mgr);
SAFE_DELETE(_external_scan_context_mgr);
+ SAFE_DELETE(_user_function_cache);
+
+ _serial_download_cache_thread_token.reset(nullptr);
+ _download_cache_thread_pool.reset(nullptr);
+
+ // _heartbeat_flags must be destoried after staroge engine
SAFE_DELETE(_heartbeat_flags);
- SAFE_DELETE(_scanner_scheduler);
- SAFE_DELETE(_group_commit_mgr);
- SAFE_DELETE(_file_meta_cache);
+
// Master Info is a thrift object, it could be the last one to deconstruct.
// Master info should be deconstruct later than fragment manager, because
fragment will
// access master_info.backend id to access some info. If there is a
running query and master
// info is deconstructed then BE process will core at coordinator back
method in fragment mgr.
SAFE_DELETE(_master_info);
- _new_load_stream_mgr.reset();
- _memtable_memory_limiter.reset(nullptr);
- _send_batch_thread_pool.reset(nullptr);
- _buffered_reader_prefetch_thread_pool.reset(nullptr);
- _send_report_thread_pool.reset(nullptr);
- _join_node_thread_pool.reset(nullptr);
- _serial_download_cache_thread_token.reset(nullptr);
- _download_cache_thread_pool.reset(nullptr);
- _orphan_mem_tracker.reset();
- _experimental_mem_tracker.reset();
- _page_no_cache_mem_tracker.reset();
- _brpc_iobuf_block_memory_tracker.reset();
- InvertedIndexSearcherCache::reset_global_instance();
-}
-
-void ExecEnv::destroy(ExecEnv* env) {
- env->_destroy();
+ LOG(INFO) << "Doris exec envorinment is destoried.";
}
} // namespace doris
diff --git a/be/src/runtime/external_scan_context_mgr.cpp
b/be/src/runtime/external_scan_context_mgr.cpp
index 2a3dc92521..6c51cfccbe 100644
--- a/be/src/runtime/external_scan_context_mgr.cpp
+++ b/be/src/runtime/external_scan_context_mgr.cpp
@@ -50,7 +50,7 @@ ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv*
exec_env)
});
}
-ExternalScanContextMgr::~ExternalScanContextMgr() {
+void ExternalScanContextMgr::stop() {
DEREGISTER_HOOK_METRIC(active_scan_context_count);
_stop_background_threads_latch.count_down();
if (_keep_alive_reaper) {
diff --git a/be/src/runtime/external_scan_context_mgr.h
b/be/src/runtime/external_scan_context_mgr.h
index 93ae8d360a..4925821f3a 100644
--- a/be/src/runtime/external_scan_context_mgr.h
+++ b/be/src/runtime/external_scan_context_mgr.h
@@ -52,7 +52,9 @@ public:
class ExternalScanContextMgr {
public:
ExternalScanContextMgr(ExecEnv* exec_env);
- ~ExternalScanContextMgr();
+ ~ExternalScanContextMgr() = default;
+
+ void stop();
Status create_scan_context(std::shared_ptr<ScanContext>* p_context);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 703019a39b..adbe401356 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -139,7 +139,9 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
CHECK(s.ok()) << s.to_string();
}
-FragmentMgr::~FragmentMgr() {
+FragmentMgr::~FragmentMgr() {}
+
+void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(plan_fragment_count);
DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
_stop_background_threads_latch.count_down();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0cf5cf2d58..14c63c559b 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -75,6 +75,8 @@ public:
FragmentMgr(ExecEnv* exec_env);
~FragmentMgr() override;
+ void stop();
+
// execute one plan fragment
Status exec_plan_fragment(const TExecPlanFragmentParams& params);
diff --git a/be/src/runtime/frontend_info.h b/be/src/runtime/frontend_info.h
index c16d63096f..a7e4b3f999 100644
--- a/be/src/runtime/frontend_info.h
+++ b/be/src/runtime/frontend_info.h
@@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+#pragma once
#include <gen_cpp/HeartbeatService_types.h>
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 5c9b3c1637..94c0dba30a 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -362,7 +362,14 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) :
_exec_env(exec_env) {
.build(&_insert_into_thread_pool);
}
-GroupCommitMgr::~GroupCommitMgr() {}
+GroupCommitMgr::~GroupCommitMgr() {
+ LOG(INFO) << "GroupCommitMgr is destoried";
+}
+
+void GroupCommitMgr::stop() {
+ _insert_into_thread_pool->shutdown();
+ LOG(INFO) << "GroupCommitMgr is stopped";
+}
Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
const TDescriptorTable& tdesc_tbl,
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 0d5797ed96..1d124009d1 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -109,6 +109,8 @@ public:
GroupCommitMgr(ExecEnv* exec_env);
virtual ~GroupCommitMgr();
+ void stop();
+
// insert into
Status group_commit_insert(int64_t table_id, const TPlan& plan,
const TDescriptorTable& desc_tbl,
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 9987a34d5b..1aaf8772b8 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -73,13 +73,16 @@ LoadChannelMgr::LoadChannelMgr() :
_stop_background_threads_latch(1) {
}
LoadChannelMgr::~LoadChannelMgr() {
+ delete _last_success_channel;
+}
+
+void LoadChannelMgr::stop() {
DEREGISTER_HOOK_METRIC(load_channel_count);
DEREGISTER_HOOK_METRIC(load_channel_mem_consumption);
_stop_background_threads_latch.count_down();
if (_load_channels_clean_thread) {
_load_channels_clean_thread->join();
}
- delete _last_success_channel;
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 3c094a4251..a77c0b0cc2 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -63,6 +63,8 @@ public:
// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);
+ void stop();
+
private:
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool&
is_eof,
const UniqueId& load_id, const
PTabletWriterAddBlockRequest& request);
diff --git a/be/src/runtime/load_path_mgr.cpp b/be/src/runtime/load_path_mgr.cpp
index c9a5f18183..e241eeafea 100644
--- a/be/src/runtime/load_path_mgr.cpp
+++ b/be/src/runtime/load_path_mgr.cpp
@@ -56,7 +56,7 @@ LoadPathMgr::LoadPathMgr(ExecEnv* exec_env)
_error_path_next_shard(0),
_stop_background_threads_latch(1) {}
-LoadPathMgr::~LoadPathMgr() {
+void LoadPathMgr::stop() {
_stop_background_threads_latch.count_down();
if (_clean_thread) {
_clean_thread->join();
diff --git a/be/src/runtime/load_path_mgr.h b/be/src/runtime/load_path_mgr.h
index 73703517c6..de443f059b 100644
--- a/be/src/runtime/load_path_mgr.h
+++ b/be/src/runtime/load_path_mgr.h
@@ -39,9 +39,10 @@ class Thread;
class LoadPathMgr {
public:
LoadPathMgr(ExecEnv* env);
- ~LoadPathMgr();
+ ~LoadPathMgr() = default;
Status init();
+ void stop();
Status allocate_dir(const std::string& db, const std::string& label,
std::string* prefix);
diff --git a/be/src/runtime/memory/cache_manager.h
b/be/src/runtime/memory/cache_manager.h
index fd7d5875b0..8fdce10d69 100644
--- a/be/src/runtime/memory/cache_manager.h
+++ b/be/src/runtime/memory/cache_manager.h
@@ -17,6 +17,7 @@
#pragma once
+#include "runtime/exec_env.h"
#include "runtime/memory/cache_policy.h"
#include "util/runtime_profile.h"
@@ -25,12 +26,11 @@ namespace doris {
// Hold the list of all caches, for prune when memory not enough or timing.
class CacheManager {
public:
- static void create_global_instance() {
- DCHECK(_s_instance == nullptr);
- static CacheManager instance;
- _s_instance = &instance;
+ static CacheManager* create_global_instance() {
+ CacheManager* res = new CacheManager();
+ return res;
}
- static CacheManager* instance() { return _s_instance; }
+ static CacheManager* instance() { return
ExecEnv::GetInstance()->get_cache_manager(); }
std::list<CachePolicy*>::iterator register_cache(CachePolicy* cache) {
std::lock_guard<std::mutex> l(_caches_lock);
@@ -55,8 +55,6 @@ public:
void clear_once(CachePolicy::CacheType type);
private:
- static inline CacheManager* _s_instance = nullptr;
-
std::mutex _caches_lock;
std::list<CachePolicy*> _caches;
};
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index 32eeb702df..a3b99300f2 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -47,7 +47,7 @@ ResultBufferMgr::ResultBufferMgr() :
_stop_background_threads_latch(1) {
});
}
-ResultBufferMgr::~ResultBufferMgr() {
+void ResultBufferMgr::stop() {
DEREGISTER_HOOK_METRIC(result_buffer_block_count);
_stop_background_threads_latch.count_down();
if (_clean_thread) {
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index f164a14b93..8c9b621968 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -47,9 +47,12 @@ class Thread;
class ResultBufferMgr {
public:
ResultBufferMgr();
- ~ResultBufferMgr();
+ ~ResultBufferMgr() = default;
// init Result Buffer Mgr, start cancel thread
Status init();
+
+ void stop();
+
// create one result sender for this query_id
// the returned sender do not need release
// sender is not used when call cancel or unregister
diff --git a/be/src/runtime/routine_load/data_consumer_pool.h
b/be/src/runtime/routine_load/data_consumer_pool.h
index 15baa2be6e..25dbc57fb7 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.h
+++ b/be/src/runtime/routine_load/data_consumer_pool.h
@@ -41,7 +41,9 @@ public:
DataConsumerPool(int64_t max_pool_size)
: _max_pool_size(max_pool_size), _stop_background_threads_latch(1)
{}
- ~DataConsumerPool() {
+ ~DataConsumerPool() = default;
+
+ void stop() {
_stop_background_threads_latch.count_down();
if (_clean_idle_consumer_thread) {
_clean_idle_consumer_thread->join();
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 72285930b4..e5c48f78d9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -76,12 +76,15 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv*
exec_env)
}
RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
+ LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
+ _task_map.clear();
+}
+
+void RoutineLoadTaskExecutor::stop() {
DEREGISTER_HOOK_METRIC(routine_load_task_count);
_thread_pool.shutdown();
_thread_pool.join();
-
- LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
- _task_map.clear();
+ _data_consumer_pool.stop();
}
// Create a temp StreamLoadContext and set some kafka connection info in it.
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 90c1a06400..6714ce6902 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -51,6 +51,8 @@ public:
~RoutineLoadTaskExecutor();
+ void stop();
+
// submit a routine load task
Status submit_task(const TRoutineLoadTask& task);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 6ce6d31604..179bf8911a 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -28,9 +28,6 @@
namespace doris::taskgroup {
-TaskGroupManager::TaskGroupManager() = default;
-TaskGroupManager::~TaskGroupManager() = default;
-
TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo&
task_group_info) {
{
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 375208dc6e..baa6579b15 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -29,8 +29,8 @@ namespace taskgroup {
class TaskGroupManager {
public:
- TaskGroupManager();
- ~TaskGroupManager();
+ TaskGroupManager() = default;
+ ~TaskGroupManager() = default;
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo&
task_group_info);
diff --git a/be/src/runtime/user_function_cache.cpp
b/be/src/runtime/user_function_cache.cpp
index 39507ec222..9f07ba5902 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -39,6 +39,7 @@
#include "http/http_client.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
#include "util/dynamic_util.h"
#include "util/md5.h"
#include "util/spinlock.h"
@@ -122,8 +123,7 @@ UserFunctionCache::~UserFunctionCache() {
}
UserFunctionCache* UserFunctionCache::instance() {
- static UserFunctionCache s_cache;
- return &s_cache;
+ return ExecEnv::GetInstance()->user_function_cache();
}
Status UserFunctionCache::init(const std::string& lib_dir) {
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index f35c49cbd2..57219f584e 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -86,8 +86,16 @@ Status BRpcService::start(int port, int num_threads) {
}
void BRpcService::join() {
- _server->Stop(1000);
- _server->Join();
+ int stop_succeed = _server->Stop(1000);
+
+ if (stop_succeed == 0) {
+ _server->Join();
+ } else {
+ LOG(WARNING) << "Failed to stop brpc service, "
+ << "not calling brpc server join since it will never
retrun."
+ << "maybe something bad will happen, let us know if you
meet something error.";
+ }
+
_server->ClearServices();
}
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index fd55c5a211..abfa7913e7 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -57,7 +57,6 @@
#include "common/daemon.h"
#include "common/logging.h"
#include "common/phdr_cache.h"
-#include "common/resource_tls.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache_factory.h"
@@ -449,49 +448,6 @@ int main(int argc, char** argv) {
// Or our own sig-handler for SIGINT & SIGTERM will not be chained ...
// https://www.oracle.com/java/technologies/javase/signals.html
doris::init_signals();
-
- // Load file cache before starting up daemon threads to make sure
StorageEngine is read.
- if (doris::config::enable_file_cache) {
- doris::io::IFileCache::init();
- std::unordered_set<std::string> cache_path_set;
- std::vector<doris::CachePath> cache_paths;
- olap_res =
doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
- if (!olap_res) {
- LOG(FATAL) << "parse config file cache path failed, path="
- << doris::config::file_cache_path;
- exit(-1);
- }
-
- std::unique_ptr<doris::ThreadPool> file_cache_init_pool;
- doris::ThreadPoolBuilder("FileCacheInitThreadPool")
- .set_min_threads(cache_paths.size())
- .set_max_threads(cache_paths.size())
- .build(&file_cache_init_pool);
-
- std::list<doris::Status> cache_status;
- for (auto& cache_path : cache_paths) {
- if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
- LOG(WARNING) << fmt::format("cache path {} is duplicate",
cache_path.path);
- continue;
- }
-
- RETURN_IF_ERROR(file_cache_init_pool->submit_func(
- std::bind(&doris::io::FileCacheFactory::create_file_cache,
- &(doris::io::FileCacheFactory::instance()),
cache_path.path,
- cache_path.init_settings(),
&(cache_status.emplace_back()))));
-
- cache_path_set.emplace(cache_path.path);
- }
-
- file_cache_init_pool->wait();
- for (const auto& status : cache_status) {
- if (!status.ok()) {
- LOG(FATAL) << "failed to init file cache, err: " << status;
- exit(-1);
- }
- }
- }
-
// ATTN: MUST init before `ExecEnv`, `StorageEngine` and other daemon
services
//
// Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo
@@ -501,7 +457,6 @@ int main(int argc, char** argv) {
doris::CpuInfo::init();
doris::DiskInfo::init();
doris::MemInfo::init();
-
doris::UserFunctionCache::instance()->init(doris::config::user_function_dir);
LOG(INFO) << doris::CpuInfo::debug_string();
LOG(INFO) << doris::DiskInfo::debug_string();
@@ -511,41 +466,17 @@ int main(int argc, char** argv) {
// will work only after additional call of this function.
// rewrites dl_iterate_phdr will cause Jemalloc to fail to run after
enable profile. see #
// updatePHDRCache();
-
- doris::ResourceTls::init();
if (!doris::BackendOptions::init()) {
exit(-1);
}
// init exec env
- auto exec_env = doris::ExecEnv::GetInstance();
- doris::ExecEnv::init(exec_env, paths);
- doris::TabletSchemaCache::create_global_schema_cache();
-
- // init s3 write buffer pool
- doris::io::S3FileBufferPool* s3_buffer_pool =
doris::io::S3FileBufferPool::GetInstance();
- s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size,
- doris::config::s3_write_buffer_size,
- exec_env->buffered_reader_prefetch_thread_pool());
-
- // init and open storage engine
- doris::EngineOptions options;
- options.store_paths = paths;
- options.backend_uid = doris::UniqueId::gen_uid();
- std::unique_ptr<doris::StorageEngine> engine;
- auto st = doris::StorageEngine::open(options, &engine);
- if (!st.ok()) {
- LOG(FATAL) << "fail to open StorageEngine, res=" << st;
+ auto exec_env(doris::ExecEnv::GetInstance());
+ status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths);
+ if (status != Status::OK()) {
+ LOG(FATAL) << "failed to init doris storage engine, res=" << status;
exit(-1);
}
- engine->set_heartbeat_flags(exec_env->heartbeat_flags());
-
- // start all background threads of storage engine.
- // SHOULD be called after exec env is initialized.
- EXIT_IF_ERROR(engine->start_bg_threads());
-
- doris::Daemon daemon;
- daemon.start();
doris::telemetry::init_tracer();
@@ -563,8 +494,9 @@ int main(int argc, char** argv) {
}
// 2. bprc service
- doris::BRpcService brpc_service(exec_env);
- status = brpc_service.start(doris::config::brpc_port,
doris::config::brpc_num_threads);
+ std::unique_ptr<doris::BRpcService> brpc_service =
+ std::make_unique<doris::BRpcService>(exec_env);
+ status = brpc_service->start(doris::config::brpc_port,
doris::config::brpc_num_threads);
if (!status.ok()) {
LOG(ERROR) << "BRPC service did not start correctly, exiting";
doris::shutdown_logging();
@@ -572,9 +504,9 @@ int main(int argc, char** argv) {
}
// 3. http service
- doris::HttpService http_service(exec_env, doris::config::webserver_port,
- doris::config::webserver_num_workers);
- status = http_service.start();
+ std::unique_ptr<doris::HttpService> http_service =
std::make_unique<doris::HttpService>(
+ exec_env, doris::config::webserver_port,
doris::config::webserver_num_workers);
+ status = http_service->start();
if (!status.ok()) {
LOG(ERROR) << "Doris Be http service did not start correctly, exiting";
doris::shutdown_logging();
@@ -605,6 +537,10 @@ int main(int argc, char** argv) {
std::shared_ptr<doris::flight::FlightSqlServer> flight_server =
std::move(doris::flight::FlightSqlServer::create()).ValueOrDie();
status = flight_server->init(doris::config::arrow_flight_port);
+
+ // 6. start daemon thread to do clean or gc jobs
+ doris::Daemon daemon;
+ daemon.start();
if (!status.ok()) {
LOG(ERROR) << "Arrow Flight Service did not start correctly, exiting, "
<< status.to_string();
@@ -618,10 +554,26 @@ int main(int argc, char** argv) {
#endif
sleep(3);
}
-
+ LOG(INFO) << "Doris main exiting.";
// For graceful shutdown, need to wait for all running queries to stop
exec_env->wait_for_all_tasks_done();
-
+ daemon.stop();
+ flight_server.reset();
+ LOG(INFO) << "Flight server stopped.";
+ heartbeat_thrift_server->stop();
+ heartbeat_thrift_server.reset(nullptr);
+ LOG(INFO) << "Heartbeat server stopped";
+ // TODO(zhiqiang): http_service
+ http_service->stop();
+ http_service.reset(nullptr);
+ LOG(INFO) << "Http service stopped";
+ be_server->stop();
+ be_server.reset(nullptr);
+ LOG(INFO) << "Be server stopped";
+ brpc_service.reset(nullptr);
+ LOG(INFO) << "Brpc service stopped";
+ exec_env->destroy();
+ LOG(INFO) << "Doris main exited.";
return 0;
}
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index dcc3082972..fd8c640d20 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -239,8 +239,12 @@ Status HttpService::start() {
}
void HttpService::stop() {
+ if (stopped) {
+ return;
+ }
_ev_http_server->stop();
_pool.clear();
+ stopped = true;
}
} // namespace doris
diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h
index dea8c4141c..05d36fdd75 100644
--- a/be/src/service/http_service.h
+++ b/be/src/service/http_service.h
@@ -43,6 +43,8 @@ private:
std::unique_ptr<EvHttpServer> _ev_http_server;
std::unique_ptr<WebPageHandler> _web_page_handler;
+
+ bool stopped = false;
};
} // namespace doris
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index df762226a2..a1cf85f05a 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -33,6 +33,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/key_util.h"
#include "util/runtime_profile.h"
@@ -101,15 +102,12 @@ int64_t Reusable::mem_size() const {
return _mem_size;
}
-LookupConnectionCache* LookupConnectionCache::_s_instance = nullptr;
-void LookupConnectionCache::create_global_instance(size_t capacity) {
- DCHECK(_s_instance == nullptr);
- static LookupConnectionCache instance(capacity);
- _s_instance = &instance;
+LookupConnectionCache* LookupConnectionCache::create_global_instance(size_t
capacity) {
+ DCHECK(ExecEnv::GetInstance()->get_lookup_connection_cache() == nullptr);
+ LookupConnectionCache* res = new LookupConnectionCache(capacity);
+ return res;
}
-RowCache* RowCache::_s_instance = nullptr;
-
RowCache::RowCache(int64_t capacity, int num_shards) {
// Create Row Cache
_cache = std::unique_ptr<Cache>(
@@ -117,14 +115,14 @@ RowCache::RowCache(int64_t capacity, int num_shards) {
}
// Create global instance of this class
-void RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) {
- DCHECK(_s_instance == nullptr);
- static RowCache instance(capacity, num_shards);
- _s_instance = &instance;
+RowCache* RowCache::create_global_cache(int64_t capacity, uint32_t num_shards)
{
+ DCHECK(ExecEnv::GetInstance()->get_row_cache() == nullptr);
+ RowCache* res = new RowCache(capacity, num_shards);
+ return res;
}
RowCache* RowCache::instance() {
- return _s_instance;
+ return ExecEnv::GetInstance()->get_row_cache();
}
bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) {
diff --git a/be/src/service/point_query_executor.h
b/be/src/service/point_query_executor.h
index 180af54fdd..4e51217c3b 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -46,6 +46,7 @@
#include "olap/tablet.h"
#include "olap/utils.h"
#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
#include "util/mysql_global.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
@@ -161,7 +162,7 @@ public:
};
// Create global instance of this class
- static void create_global_cache(int64_t capacity, uint32_t num_shards =
kDefaultNumShards);
+ static RowCache* create_global_cache(int64_t capacity, uint32_t num_shards
= kDefaultNumShards);
static RowCache* instance();
@@ -183,7 +184,6 @@ public:
private:
static constexpr uint32_t kDefaultNumShards = 128;
RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
- static RowCache* _s_instance;
std::unique_ptr<Cache> _cache = nullptr;
};
@@ -191,9 +191,11 @@ private:
// One connection per stmt perf uuid
class LookupConnectionCache : public LRUCachePolicy {
public:
- static LookupConnectionCache* instance() { return _s_instance; }
+ static LookupConnectionCache* instance() {
+ return ExecEnv::GetInstance()->get_lookup_connection_cache();
+ }
- static void create_global_instance(size_t capacity);
+ static LookupConnectionCache* create_global_instance(size_t capacity);
private:
friend class PointQueryExecutor;
@@ -240,8 +242,6 @@ private:
struct CacheValue : public LRUCacheValueBase {
std::shared_ptr<Reusable> item = nullptr;
};
-
- static LookupConnectionCache* _s_instance;
};
struct Metrics {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 87903e480a..439a843c3a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -61,6 +61,17 @@ ScannerScheduler::~ScannerScheduler() {
return;
}
+ for (int i = 0; i < QUEUE_NUM; i++) {
+ delete _pending_queues[i];
+ }
+ delete[] _pending_queues;
+}
+
+void ScannerScheduler::stop() {
+ if (!_is_init) {
+ return;
+ }
+
for (int i = 0; i < QUEUE_NUM; i++) {
_pending_queues[i]->shutdown();
}
@@ -80,13 +91,10 @@ ScannerScheduler::~ScannerScheduler() {
_limited_scan_thread_pool->wait();
_group_local_scan_thread_pool->wait();
- for (int i = 0; i < QUEUE_NUM; i++) {
- delete _pending_queues[i];
- }
- delete[] _pending_queues;
+ LOG(INFO) << "ScannerScheduler stopped";
}
-Status ScannerScheduler::init(ExecEnv* env) {
+Status ScannerScheduler::init() {
// 1. scheduling thread pool and scheduling queues
ThreadPoolBuilder("SchedulingThreadPool")
.set_min_threads(QUEUE_NUM)
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index e669fd9b77..366275eb41 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -64,10 +64,12 @@ public:
ScannerScheduler();
~ScannerScheduler();
- Status init(ExecEnv* env);
+ [[nodiscard]] Status init();
[[nodiscard]] Status submit(ScannerContext* ctx);
+ void stop();
+
std::unique_ptr<ThreadPoolToken>
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int
max_concurrency);
taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
diff --git a/be/test/common/resource_tls_test.cpp
b/be/test/common/resource_tls_test.cpp
deleted file mode 100644
index e09227eb03..0000000000
--- a/be/test/common/resource_tls_test.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "common/resource_tls.h"
-
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-
-#include <memory>
-
-#include "gen_cpp/Types_types.h"
-#include "gtest/gtest_pred_impl.h"
-
-namespace doris {
-
-class ResourceTlsTest : public testing::Test {};
-
-TEST_F(ResourceTlsTest, EmptyTest) {
- EXPECT_TRUE(ResourceTls::get_resource_tls() == nullptr);
- EXPECT_TRUE(ResourceTls::set_resource_tls((TResourceInfo*)1) != 0);
-}
-
-TEST_F(ResourceTlsTest, NormalTest) {
- ResourceTls::init();
- EXPECT_TRUE(ResourceTls::get_resource_tls() == nullptr);
- TResourceInfo* info = new TResourceInfo();
- info->user = "testUser";
- info->group = "testGroup";
- EXPECT_TRUE(ResourceTls::set_resource_tls(info) == 0);
- TResourceInfo* getInfo = ResourceTls::get_resource_tls();
- EXPECT_STREQ("testUser", getInfo->user.c_str());
- EXPECT_STREQ("testGroup", getInfo->group.c_str());
-}
-
-} // namespace doris
diff --git a/be/test/olap/delete_bitmap_calculator_test.cpp
b/be/test/olap/delete_bitmap_calculator_test.cpp
index 6e842edc36..53bdb9c0fb 100644
--- a/be/test/olap/delete_bitmap_calculator_test.cpp
+++ b/be/test/olap/delete_bitmap_calculator_test.cpp
@@ -40,6 +40,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/tablet_schema_helper.h"
+#include "runtime/exec_env.h"
namespace doris {
using namespace ErrorCode;
@@ -73,7 +74,7 @@ public:
EXPECT_TRUE(io::global_local_filesystem()->delete_and_create_directory(kSegmentDir).ok());
doris::EngineOptions options;
k_engine = new StorageEngine(options);
- StorageEngine::_s_instance = k_engine;
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
}
void TearDown() override {
@@ -82,6 +83,7 @@ public:
k_engine->stop();
delete k_engine;
k_engine = nullptr;
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
}
diff --git a/be/test/olap/delete_handler_test.cpp
b/be/test/olap/delete_handler_test.cpp
index 605e3e389c..b338320d0f 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -29,6 +29,7 @@
#include <cstdlib>
#include <iostream>
+#include <memory>
#include <string>
#include <vector>
@@ -46,6 +47,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
+#include "runtime/exec_env.h"
#include "util/cpu_info.h"
using namespace std;
@@ -78,8 +80,10 @@ static void set_up() {
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ Status s = k_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
+ ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
}
static void tear_down() {
@@ -90,6 +94,7 @@ static void tear_down() {
EXPECT_TRUE(io::global_local_filesystem()
->delete_directory(string(getenv("DORIS_HOME")) + "/"
+ UNUSED_PREFIX)
.ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
k_engine.reset();
}
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index a8dfa4a760..6fc57fd15d 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -79,17 +79,20 @@ static void set_up() {
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ Status s = k_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+ exec_env->set_storage_engine(k_engine.get());
k_engine->start_bg_threads();
}
static void tear_down() {
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(nullptr);
+ exec_env->set_storage_engine(nullptr);
k_engine.reset();
EXPECT_EQ(system("rm -rf ./data_test"), 0);
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME"))
+ "/" +
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index 706cb74979..037e2a70c7 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -77,16 +77,19 @@ static void set_up() {
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ Status s = k_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
- k_engine->start_bg_threads();
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+ exec_env->set_storage_engine(k_engine.get());
+ k_engine->start_bg_threads();
}
static void tear_down() {
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(nullptr);
+ exec_env->set_storage_engine(nullptr);
k_engine.reset();
EXPECT_EQ(system("rm -rf ./data_test_1"), 0);
EXPECT_EQ(system("rm -rf ./data_test_2"), 0);
diff --git a/be/test/olap/memtable_flush_executor_test.cpp
b/be/test/olap/memtable_flush_executor_test.cpp
index efe95e36a3..283b010b76 100644
--- a/be/test/olap/memtable_flush_executor_test.cpp
+++ b/be/test/olap/memtable_flush_executor_test.cpp
@@ -55,13 +55,15 @@ void set_up() {
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ Status s = k_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
-
+ ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
k_flush_executor = k_engine->memtable_flush_executor();
}
void tear_down() {
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
k_engine.reset();
system("rm -rf ./flush_test");
EXPECT_TRUE(io::global_local_filesystem()
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
index 187fc09cb4..22a8421fd9 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -85,15 +85,23 @@ protected:
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &_engine);
+ _engine = std::make_unique<StorageEngine>(options);
+ Status st = _engine->open();
+ EXPECT_TRUE(st.ok()) << st.to_string();
+
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
- _engine->start_bg_threads();
+ // ExecEnv's storage_engine will be read by storage_engine's other
operations.
+ // So we must do this before storage engine's other operation.
+ exec_env->set_storage_engine(_engine.get());
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+ _engine->start_bg_threads();
}
void TearDown() override {
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(nullptr);
+ exec_env->set_storage_engine(nullptr);
+ _engine.reset(nullptr);
EXPECT_EQ(system("rm -rf ./data_test"), 0);
io::global_local_filesystem()->delete_directory(std::string(getenv("DORIS_HOME"))
+ "/" +
UNUSED_PREFIX);
diff --git a/be/test/olap/ordered_data_compaction_test.cpp
b/be/test/olap/ordered_data_compaction_test.cpp
index ab0562a3ac..b53689c229 100644
--- a/be/test/olap/ordered_data_compaction_test.cpp
+++ b/be/test/olap/ordered_data_compaction_test.cpp
@@ -61,6 +61,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
+#include "runtime/exec_env.h"
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
@@ -89,8 +90,7 @@ protected:
_data_dir->update_capacity();
doris::EngineOptions options;
k_engine = new StorageEngine(options);
- StorageEngine::_s_instance = k_engine;
-
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
config::enable_ordered_data_compaction = true;
config::ordered_data_compaction_min_segment_size = 10;
}
@@ -100,6 +100,7 @@ protected:
k_engine->stop();
delete k_engine;
k_engine = nullptr;
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
}
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index 95b34cf0f8..6592cb53dd 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -54,6 +54,7 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
+#include "runtime/exec_env.h"
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
@@ -77,7 +78,7 @@ protected:
.ok());
doris::EngineOptions options;
k_engine = new StorageEngine(options);
- StorageEngine::_s_instance = k_engine;
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
}
void TearDown() override {
@@ -86,6 +87,7 @@ protected:
k_engine->stop();
delete k_engine;
k_engine = nullptr;
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
}
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp
b/be/test/olap/rowset/beta_rowset_test.cpp
index 5e259745d6..81e587370a 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -100,13 +100,18 @@ public:
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ Status s = k_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
+ ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(kTestDir).ok());
}
- static void TearDownTestSuite() { k_engine.reset(); }
+ static void TearDownTestSuite() {
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ k_engine.reset();
+ }
protected:
OlapReaderStatistics _stats;
diff --git a/be/test/olap/rowset/rowset_meta_manager_test.cpp
b/be/test/olap/rowset/rowset_meta_manager_test.cpp
index a747d1fa2c..044b95ff97 100644
--- a/be/test/olap/rowset/rowset_meta_manager_test.cpp
+++ b/be/test/olap/rowset/rowset_meta_manager_test.cpp
@@ -36,6 +36,7 @@
#include "olap/olap_meta.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
#include "util/uid_util.h"
using ::testing::_;
@@ -60,6 +61,7 @@ public:
if (k_engine == nullptr) {
k_engine = new StorageEngine(options);
}
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
std::string meta_path = "./meta";
EXPECT_TRUE(std::filesystem::create_directory(meta_path));
@@ -83,6 +85,7 @@ public:
virtual void TearDown() {
SAFE_DELETE(_meta);
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
SAFE_DELETE(k_engine);
EXPECT_TRUE(std::filesystem::remove_all("./meta"));
LOG(INFO) << "TearDown";
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 03fa82c3cc..e475791da3 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -248,13 +248,17 @@ public:
EngineOptions options;
options.store_paths = paths;
- doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ auto st = k_engine->open();
+ EXPECT_TRUE(st.ok()) << st.to_string();
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
+ exec_env->set_storage_engine(k_engine.get());
}
static void TearDownTestSuite() {
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+ exec_env->set_storage_engine(nullptr);
exec_env->set_memtable_memory_limiter(nullptr);
k_engine.reset();
}
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 5954d0329f..a9a4bb940d 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -41,6 +41,7 @@
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
+#include "runtime/exec_env.h"
#include "util/uid_util.h"
using ::testing::_;
@@ -66,6 +67,7 @@ public:
// won't open engine, options.path is needless
options.backend_uid = UniqueId::gen_uid();
k_engine = new StorageEngine(options);
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
_data_dir = new DataDir(_engine_data_path, 1000000000);
_data_dir->init();
_tablet_mgr = k_engine->tablet_manager();
@@ -77,6 +79,7 @@ public:
if (k_engine != nullptr) {
k_engine->stop();
}
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
SAFE_DELETE(k_engine);
_tablet_mgr = nullptr;
}
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 838db9b0d6..3fc32d3b3d 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -34,6 +34,7 @@
#include "olap/storage_policy.h"
#include "olap/tablet_meta.h"
#include "olap/utils.h"
+#include "runtime/exec_env.h"
#include "testutil/mock_rowset.h"
#include "util/time.h"
#include "util/uid_util.h"
@@ -86,7 +87,7 @@ public:
doris::EngineOptions options;
k_engine = new StorageEngine(options);
- StorageEngine::_s_instance = k_engine;
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
}
void TearDown() override {
@@ -95,6 +96,7 @@ public:
k_engine->stop();
delete k_engine;
k_engine = nullptr;
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
}
diff --git a/be/test/olap/txn_manager_test.cpp
b/be/test/olap/txn_manager_test.cpp
index 2d27576b5a..fe05e206c7 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -113,6 +113,7 @@ public:
if (k_engine == nullptr) {
k_engine = new StorageEngine(options);
}
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
std::string meta_path = "./meta";
std::filesystem::remove_all("./meta");
diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp
b/be/test/runtime/external_scan_context_mgr_test.cpp
index ba4febda10..c313916476 100644
--- a/be/test/runtime/external_scan_context_mgr_test.cpp
+++ b/be/test/runtime/external_scan_context_mgr_test.cpp
@@ -38,13 +38,11 @@ public:
_exec_env._fragment_mgr = fragment_mgr;
_exec_env._result_queue_mgr = result_queue_mgr;
}
- virtual ~ExternalScanContextMgrTest() {
- delete _exec_env._fragment_mgr;
- delete _exec_env._result_queue_mgr;
- }
+ ~ExternalScanContextMgrTest() = default;
protected:
- virtual void SetUp() {}
+ void SetUp() override { _exec_env.set_ready(); }
+ void TearDown() override { _exec_env.destroy(); }
private:
ExecEnv _exec_env;
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index dae8b93e19..f3c9b55cd7 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -29,6 +29,7 @@
#include <unistd.h>
#include <functional>
+#include <memory>
#include "common/config.h"
#include "common/status.h"
@@ -582,10 +583,10 @@ public:
doris::EngineOptions options;
options.store_paths = paths;
- Status s = doris::StorageEngine::open(options, &k_engine);
+ k_engine = std::make_unique<StorageEngine>(options);
+ Status s = k_engine->open();
EXPECT_TRUE(s.ok()) << s.to_string();
-
- _env = doris::ExecEnv::GetInstance();
+ doris::ExecEnv::GetInstance()->set_storage_engine(k_engine.get());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok());
@@ -611,6 +612,7 @@ public:
}
void TearDown() override {
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
k_engine.reset();
_server->Stop(1000);
_load_stream_mgr.reset();
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp
b/be/test/runtime/routine_load_task_executor_test.cpp
index f1088300cd..8a8dcc4d67 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -93,7 +93,6 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
task.__set_kafka_load_info(k_info);
RoutineLoadTaskExecutor executor(&_env);
-
// submit task
Status st;
st = executor.submit_task(task);
@@ -116,6 +115,8 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
task.__set_kafka_load_info(k_info);
st = executor.submit_task(task);
EXPECT_TRUE(st.ok());
+
+ executor.stop();
}
-} // namespace doris
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index 2e735ce8fa..de062fa930 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -39,10 +39,13 @@
int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->init_mem_tracker();
doris::thread_context()->thread_mem_tracker_mgr->init();
- doris::CacheManager::create_global_instance();
- doris::TabletSchemaCache::create_global_schema_cache();
- doris::StoragePageCache::create_global_cache(1 << 30, 10, 0);
- doris::SegmentLoader::create_global_instance(1000);
+
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
+ doris::ExecEnv::GetInstance()->set_tablet_schema_cache(
+ doris::TabletSchemaCache::create_global_schema_cache());
+ doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->start();
+ doris::ExecEnv::GetInstance()->set_storage_page_cache(
+ doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
+ doris::ExecEnv::GetInstance()->set_segment_loader(new
doris::SegmentLoader(1000));
std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
if (!doris::config::init(conf.c_str(), false)) {
fprintf(stderr, "error read config file. \n");
@@ -54,5 +57,7 @@ int main(int argc, char** argv) {
doris::DiskInfo::init();
doris::MemInfo::init();
doris::BackendOptions::init();
- return RUN_ALL_TESTS();
+ int res = RUN_ALL_TESTS();
+ doris::ExecEnv::GetInstance()->get_tablet_schema_cache()->stop();
+ return res;
}
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index 14a202845a..74f11d7e7d 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -61,6 +61,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
+#include "runtime/exec_env.h"
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
@@ -93,7 +94,7 @@ protected:
doris::EngineOptions options;
k_engine = new StorageEngine(options);
- StorageEngine::_s_instance = k_engine;
+ ExecEnv::GetInstance()->set_storage_engine(k_engine);
}
void TearDown() override {
SAFE_DELETE(_data_dir);
@@ -102,6 +103,7 @@ protected:
k_engine->stop();
delete k_engine;
k_engine = nullptr;
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]