This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 643db55a78 [improvement](thread) stop threads when BE exit gracefully 
(#19506)
643db55a78 is described below

commit 643db55a78d2b46515ebb638c3c2a5854bb25e82
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon May 15 21:54:21 2023 +0800

    [improvement](thread) stop threads when BE exit gracefully (#19506)
---
 be/src/olap/CMakeLists.txt                         |  1 +
 be/src/olap/olap_meta.cpp                          | 11 +++
 be/src/olap/tablet_schema_cache.cpp                | 81 ++++++++++++++++++++++
 be/src/olap/tablet_schema_cache.h                  | 44 +++---------
 be/src/runtime/exec_env_init.cpp                   |  7 ++
 .../runtime/stream_load/stream_load_recorder.cpp   |  7 ++
 be/src/service/doris_main.cpp                      |  3 +-
 7 files changed, 119 insertions(+), 35 deletions(-)

diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 6398692be2..b8321c5898 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -65,6 +65,7 @@ add_library(Olap STATIC
     tablet_meta.cpp
     tablet_meta_manager.cpp
     tablet_schema.cpp
+    tablet_schema_cache.cpp
     txn_manager.cpp
     types.cpp 
     utils.cpp
diff --git a/be/src/olap/olap_meta.cpp b/be/src/olap/olap_meta.cpp
index 35ea9b8d8e..8b325c084f 100644
--- a/be/src/olap/olap_meta.cpp
+++ b/be/src/olap/olap_meta.cpp
@@ -29,6 +29,7 @@
 #include "common/config.h"
 #include "common/logging.h"
 #include "olap/olap_define.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
@@ -61,8 +62,18 @@ OlapMeta::~OlapMeta() {
             _db->DestroyColumnFamilyHandle(handle);
             handle = nullptr;
         }
+        rocksdb::Status s = _db->SyncWAL();
+        if (!s.ok()) {
+            LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString();
+        }
+        rocksdb::CancelAllBackgroundWork(_db, true);
+        s = _db->Close();
+        if (!s.ok()) {
+            LOG(WARNING) << "rocksdb close failed: " << s.ToString();
+        }
         delete _db;
         _db = nullptr;
+        LOG(INFO) << "finish close rocksdb for OlapMeta";
     }
 }
 
diff --git a/be/src/olap/tablet_schema_cache.cpp 
b/be/src/olap/tablet_schema_cache.cpp
new file mode 100644
index 0000000000..24aa72a995
--- /dev/null
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+TabletSchemaSPtr TabletSchemaCache::insert(const std::string& key) {
+    DCHECK(_s_instance != nullptr);
+    std::lock_guard guard(_mtx);
+    auto iter = _cache.find(key);
+    if (iter == _cache.end()) {
+        TabletSchemaSPtr tablet_schema_ptr = std::make_shared<TabletSchema>();
+        TabletSchemaPB pb;
+        pb.ParseFromString(key);
+        tablet_schema_ptr->init_from_pb(pb);
+        _cache[key] = tablet_schema_ptr;
+        DorisMetrics::instance()->tablet_schema_cache_count->increment(1);
+        DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
+                tablet_schema_ptr->mem_size());
+        return tablet_schema_ptr;
+    }
+    return iter->second;
+}
+
+void TabletSchemaCache::stop() {
+    _should_stop = true;
+    while (!_is_stopped) {
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    LOG(INFO) << "xxx stopped";
+}
+
+/**
+ * @brief recycle when TabletSchemaSPtr use_count equals 1.
+ */
+void TabletSchemaCache::_recycle() {
+    int64_t tablet_schema_cache_recycle_interval = 86400; // s, one day
+    int64_t check_interval = 5;
+    int64_t left_second = tablet_schema_cache_recycle_interval;
+    while (!_should_stop) {
+        if (left_second > 0) {
+            std::this_thread::sleep_for(std::chrono::seconds(check_interval));
+            left_second -= check_interval;
+            continue;
+        } else {
+            left_second = tablet_schema_cache_recycle_interval;
+        }
+
+        std::lock_guard guard(_mtx);
+        LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size();
+        for (auto iter = _cache.begin(), last = _cache.end(); iter != last;) {
+            if (iter->second.unique()) {
+                
DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
+                        -iter->second->mem_size());
+                
DorisMetrics::instance()->tablet_schema_cache_count->increment(-1);
+                iter = _cache.erase(iter);
+            } else {
+                ++iter;
+            }
+        }
+    }
+    _is_stopped = true;
+    LOG(INFO) << "xxx yyy stopped ";
+}
+
+} // namespace doris
diff --git a/be/src/olap/tablet_schema_cache.h 
b/be/src/olap/tablet_schema_cache.h
index 47bfcb16f8..f6c1e61de3 100644
--- a/be/src/olap/tablet_schema_cache.h
+++ b/be/src/olap/tablet_schema_cache.h
@@ -40,51 +40,27 @@ public:
 
     static TabletSchemaCache* instance() { return _s_instance; }
 
-    TabletSchemaSPtr insert(const std::string& key) {
+    static void stop_and_join() {
         DCHECK(_s_instance != nullptr);
-        std::lock_guard guard(_mtx);
-        auto iter = _cache.find(key);
-        if (iter == _cache.end()) {
-            TabletSchemaSPtr tablet_schema_ptr = 
std::make_shared<TabletSchema>();
-            TabletSchemaPB pb;
-            pb.ParseFromString(key);
-            tablet_schema_ptr->init_from_pb(pb);
-            _cache[key] = tablet_schema_ptr;
-            DorisMetrics::instance()->tablet_schema_cache_count->increment(1);
-            
DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
-                    tablet_schema_ptr->mem_size());
-            return tablet_schema_ptr;
-        }
-        return iter->second;
+        _s_instance->stop();
     }
 
+    TabletSchemaSPtr insert(const std::string& key);
+
+    void stop();
+
 private:
     /**
      * @brief recycle when TabletSchemaSPtr use_count equals 1.
      */
-    void _recycle() {
-        int64_t tablet_schema_cache_recycle_interval = 86400; // s, one day
-        for (;;) {
-            
std::this_thread::sleep_for(std::chrono::seconds(tablet_schema_cache_recycle_interval));
-            std::lock_guard guard(_mtx);
-            LOG(INFO) << "Tablet Schema Cache Capacity " << _cache.size();
-            for (auto iter = _cache.begin(), last = _cache.end(); iter != 
last;) {
-                if (iter->second.unique()) {
-                    
DorisMetrics::instance()->tablet_schema_cache_memory_bytes->increment(
-                            -iter->second->mem_size());
-                    
DorisMetrics::instance()->tablet_schema_cache_count->increment(-1);
-                    iter = _cache.erase(iter);
-                } else {
-                    ++iter;
-                }
-            }
-        }
-    }
+    void _recycle();
 
 private:
     static inline TabletSchemaCache* _s_instance = nullptr;
     std::mutex _mtx;
     std::unordered_map<std::string, TabletSchemaSPtr> _cache;
+    std::atomic_bool _should_stop = {false};
+    std::atomic_bool _is_stopped = {false};
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8d72a4d54f..7671f3c4ad 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -387,6 +387,13 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_heartbeat_flags);
     SAFE_DELETE(_scanner_scheduler);
 
+    _send_batch_thread_pool.reset(nullptr);
+    _buffered_reader_prefetch_thread_pool.reset(nullptr);
+    _send_report_thread_pool.reset(nullptr);
+    _join_node_thread_pool.reset(nullptr);
+    _serial_download_cache_thread_token.reset(nullptr);
+    _download_cache_thread_pool.reset(nullptr);
+
     _is_init = false;
 }
 
diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp 
b/be/src/runtime/stream_load/stream_load_recorder.cpp
index bb055e43bd..71c7c7c229 100644
--- a/be/src/runtime/stream_load/stream_load_recorder.cpp
+++ b/be/src/runtime/stream_load/stream_load_recorder.cpp
@@ -27,6 +27,7 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
@@ -45,8 +46,14 @@ StreamLoadRecorder::~StreamLoadRecorder() {
             _db->DestroyColumnFamilyHandle(handle);
             handle = nullptr;
         }
+        rocksdb::Status s = _db->SyncWAL();
+        if (!s.ok()) {
+            LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString();
+        }
+        rocksdb::CancelAllBackgroundWork(_db, true);
         delete _db;
         _db = nullptr;
+        LOG(INFO) << "finish close rocksdb for ~StreamLoadRecorder";
     }
 }
 
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index cc9037e909..699f598967 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -510,6 +510,7 @@ int main(int argc, char** argv) {
         sleep(10);
     }
 
+    doris::TabletSchemaCache::stop_and_join();
     http_service.stop();
     brpc_service.join();
     daemon.stop();
@@ -526,9 +527,9 @@ int main(int argc, char** argv) {
     heartbeat_thrift_server = nullptr;
 
     doris::ExecEnv::destroy(exec_env);
-
     delete engine;
     engine = nullptr;
+
     return 0;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to