This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 643db55a78 [improvement](thread) stop threads when BE exit gracefully
(#19506)
643db55a78 is described below
commit 643db55a78d2b46515ebb638c3c2a5854bb25e82
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon May 15 21:54:21 2023 +0800
[improvement](thread) stop threads when BE exit gracefully (#19506)
---
be/src/olap/CMakeLists.txt | 1 +
be/src/olap/olap_meta.cpp | 11 +++
be/src/olap/tablet_schema_cache.cpp | 81 ++++++++++++++++++++++
be/src/olap/tablet_schema_cache.h | 44 +++---------
be/src/runtime/exec_env_init.cpp | 7 ++
.../runtime/stream_load/stream_load_recorder.cpp | 7 ++
be/src/service/doris_main.cpp | 3 +-
7 files changed, 119 insertions(+), 35 deletions(-)
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 6398692be2..b8321c5898 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -65,6 +65,7 @@ add_library(Olap STATIC
tablet_meta.cpp
tablet_meta_manager.cpp
tablet_schema.cpp
+ tablet_schema_cache.cpp
txn_manager.cpp
types.cpp
utils.cpp
diff --git a/be/src/olap/olap_meta.cpp b/be/src/olap/olap_meta.cpp
index 35ea9b8d8e..8b325c084f 100644
--- a/be/src/olap/olap_meta.cpp
+++ b/be/src/olap/olap_meta.cpp
@@ -29,6 +29,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "olap/olap_define.h"
+#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
@@ -61,8 +62,18 @@ OlapMeta::~OlapMeta() {
_db->DestroyColumnFamilyHandle(handle);
handle = nullptr;
}
+ rocksdb::Status s = _db->SyncWAL();
+ if (!s.ok()) {
+ LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString();
+ }
+ rocksdb::CancelAllBackgroundWork(_db, true);
+ s = _db->Close();
+ if (!s.ok()) {
+ LOG(WARNING) << "rocksdb close failed: " << s.ToString();
+ }
delete _db;
_db = nullptr;
+ LOG(INFO) << "finish close rocksdb for OlapMeta";
}
}
diff --git a/be/src/olap/tablet_schema_cache.cpp
b/be/src/olap/tablet_schema_cache.cpp
new file mode 100644
index 0000000000..24aa72a995
--- /dev/null
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) {
+ DCHECK(_s_instance != nullptr);
+ std::lock_guard guard(_mtx);
+ auto iter = _cache.find(key);
+ if (iter == _cache.end()) {
+ TabletSchemaSPtr tablet_schema_ptr = std::make_shared<TabletSchema>();
+ TabletSchemaPB pb;
+ pb.ParseFromString(key);
+ tablet_schema_ptr->init_from_pb(pb);
+ _cache[key] = tablet_schema_ptr;
+ DorisMetrics::instance()->tablet_schema_cache_count->increment(1);
+ DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
+ tablet_schema_ptr->mem_size());
+ return tablet_schema_ptr;
+ }
+ return iter->second;
+}
+
+void TabletSchemaCache::stop() {
+ _should_stop = true;
+ while (!_is_stopped) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ LOG(INFO) << "xxx stopped";
+}
+
+/**
+ * @brief recycle when TabletSchemaSPtr use_count equals 1.
+ */
+void TabletSchemaCache::_recycle() {
+ int64_t tablet_schema_cache_recycle_interval = 86400; // s, one day
+ int64_t check_interval = 5;
+ int64_t left_second = tablet_schema_cache_recycle_interval;
+ while (!_should_stop) {
+ if (left_second > 0) {
+ std::this_thread::sleep_for(std::chrono::seconds(check_interval));
+ left_second -= check_interval;
+ continue;
+ } else {
+ left_second = tablet_schema_cache_recycle_interval;
+ }
+
+ std::lock_guard guard(_mtx);
+ LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size();
+ for (auto iter = _cache.begin(), last = _cache.end(); iter != last;) {
+ if (iter->second.unique()) {
+
DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
+ -iter->second->mem_size());
+
DorisMetrics::instance()->tablet_schema_cache_count->increment(-1);
+ iter = _cache.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+ }
+ _is_stopped = true;
+ LOG(INFO) << "xxx yyy stopped ";
+}
+
+} // namespace doris
diff --git a/be/src/olap/tablet_schema_cache.h
b/be/src/olap/tablet_schema_cache.h
index 47bfcb16f8..f6c1e61de3 100644
--- a/be/src/olap/tablet_schema_cache.h
+++ b/be/src/olap/tablet_schema_cache.h
@@ -40,51 +40,27 @@ public:
static TabletSchemaCache* instance() { return _s_instance; }
- TabletSchemaSPtr insert(const std::string& key) {
+ static void stop_and_join() {
DCHECK(_s_instance != nullptr);
- std::lock_guard guard(_mtx);
- auto iter = _cache.find(key);
- if (iter == _cache.end()) {
- TabletSchemaSPtr tablet_schema_ptr =
std::make_shared<TabletSchema>();
- TabletSchemaPB pb;
- pb.ParseFromString(key);
- tablet_schema_ptr->init_from_pb(pb);
- _cache[key] = tablet_schema_ptr;
- DorisMetrics::instance()->tablet_schema_cache_count->increment(1);
-
DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
- tablet_schema_ptr->mem_size());
- return tablet_schema_ptr;
- }
- return iter->second;
+ _s_instance->stop();
}
+ TabletSchemaSPtr insert(const std::string& key);
+
+ void stop();
+
private:
/**
* @brief recycle when TabletSchemaSPtr use_count equals 1.
*/
- void _recycle() {
- int64_t tablet_schema_cache_recycle_interval = 86400; // s, one day
- for (;;) {
-
std::this_thread::sleep_for(std::chrono::seconds(tablet_schema_cache_recycle_interval));
- std::lock_guard guard(_mtx);
- LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size();
- for (auto iter = _cache.begin(), last = _cache.end(); iter !=
last;) {
- if (iter->second.unique()) {
-
DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
- -iter->second->mem_size());
-
DorisMetrics::instance()->tablet_schema_cache_count->increment(-1);
- iter = _cache.erase(iter);
- } else {
- ++iter;
- }
- }
- }
- }
+ void _recycle();
private:
static inline TabletSchemaCache* _s_instance = nullptr;
std::mutex _mtx;
std::unordered_map<std::string, TabletSchemaSPtr> _cache;
+ std::atomic_bool _should_stop = {false};
+ std::atomic_bool _is_stopped = {false};
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8d72a4d54f..7671f3c4ad 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -387,6 +387,13 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_heartbeat_flags);
SAFE_DELETE(_scanner_scheduler);
+ _send_batch_thread_pool.reset(nullptr);
+ _buffered_reader_prefetch_thread_pool.reset(nullptr);
+ _send_report_thread_pool.reset(nullptr);
+ _join_node_thread_pool.reset(nullptr);
+ _serial_download_cache_thread_token.reset(nullptr);
+ _download_cache_thread_pool.reset(nullptr);
+
_is_init = false;
}
diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp
b/be/src/runtime/stream_load/stream_load_recorder.cpp
index bb055e43bd..71c7c7c229 100644
--- a/be/src/runtime/stream_load/stream_load_recorder.cpp
+++ b/be/src/runtime/stream_load/stream_load_recorder.cpp
@@ -27,6 +27,7 @@
#include "common/config.h"
#include "common/status.h"
+#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
@@ -45,8 +46,14 @@ StreamLoadRecorder::~StreamLoadRecorder() {
_db->DestroyColumnFamilyHandle(handle);
handle = nullptr;
}
+ rocksdb::Status s = _db->SyncWAL();
+ if (!s.ok()) {
+ LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString();
+ }
+ rocksdb::CancelAllBackgroundWork(_db, true);
delete _db;
_db = nullptr;
+ LOG(INFO) << "finish close rocksdb for ~StreamLoadRecorder";
}
}
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index cc9037e909..699f598967 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -510,6 +510,7 @@ int main(int argc, char** argv) {
sleep(10);
}
+ doris::TabletSchemaCache::stop_and_join();
http_service.stop();
brpc_service.join();
daemon.stop();
@@ -526,9 +527,9 @@ int main(int argc, char** argv) {
heartbeat_thrift_server = nullptr;
doris::ExecEnv::destroy(exec_env);
-
delete engine;
engine = nullptr;
+
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]