This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new db0dbda5f6 [fix](merge-on-write) fix dead lock when publish (#21339) 
(#24209)
db0dbda5f6 is described below

commit db0dbda5f6196f5972fb032234a78b8bba1ece1c
Author: Xin Liao <[email protected]>
AuthorDate: Tue Sep 12 11:07:19 2023 +0800

    [fix](merge-on-write) fix dead lock when publish (#21339) (#24209)
---
 be/src/olap/task/engine_publish_version_task.cpp | 35 ++++--------------------
 be/src/olap/task/engine_publish_version_task.h   |  7 -----
 2 files changed, 6 insertions(+), 36 deletions(-)

diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index eff14dfdb0..c793ddbf6c 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -34,8 +34,7 @@ using std::map;
 EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& 
publish_version_req,
                                                    std::vector<TTabletId>* 
error_tablet_ids,
                                                    std::vector<TTabletId>* 
succ_tablet_ids)
-        : _total_task_num(0),
-          _publish_version_req(publish_version_req),
+        : _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
           _succ_tablet_ids(succ_tablet_ids) {}
 
@@ -49,25 +48,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t 
tablet_id) {
     _succ_tablet_ids->push_back(tablet_id);
 }
 
-void EnginePublishVersionTask::wait() {
-    std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
-    _tablet_finish_cond.wait(lock);
-}
-
-void EnginePublishVersionTask::notify() {
-    std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
-    _tablet_finish_cond.notify_one();
-}
-
-int64_t EnginePublishVersionTask::finish_task() {
-    return _total_task_num.fetch_sub(1);
-}
-
 Status EnginePublishVersionTask::finish() {
     Status res = Status::OK();
     int64_t transaction_id = _publish_version_req.transaction_id;
     OlapStopWatch watch;
     VLOG_NOTICE << "begin to process publish version. transaction_id=" << 
transaction_id;
+    std::unique_ptr<ThreadPoolToken> token =
+            
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
+                    ThreadPool::ExecutionMode::CONCURRENT);
 
     // each partition
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -145,19 +133,13 @@ Status EnginePublishVersionTask::finish() {
                     continue;
                 }
             }
-            _total_task_num.fetch_add(1);
             auto tablet_publish_txn_ptr = 
std::make_shared<TabletPublishTxnTask>(
                     this, tablet, rowset, partition_id, transaction_id, 
version, tablet_info);
-            auto submit_st =
-                    
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
-                            [=]() { tablet_publish_txn_ptr->handle(); });
+            auto submit_st = token->submit_func([=]() { 
tablet_publish_txn_ptr->handle(); });
             CHECK(submit_st.ok()) << submit_st;
         }
     }
-    // wait for all publish txn finished
-    while (_total_task_num.load() != 0) {
-        wait();
-    }
+    token->wait();
 
     // check if the related tablet remained all have the version
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -209,11 +191,6 @@ 
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
           _tablet_info(tablet_info) {}
 
 void TabletPublishTxnTask::handle() {
-    Defer defer {[&] {
-        if (_engine_publish_version_task->finish_task() == 1) {
-            _engine_publish_version_task->notify();
-        }
-    }};
     auto publish_status = 
StorageEngine::instance()->txn_manager()->publish_txn(
             _partition_id, _tablet, _transaction_id, _version);
     if (publish_status != Status::OK()) {
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index 0a0de3efc0..0a918ba402 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -57,20 +57,13 @@ public:
     void add_error_tablet_id(int64_t tablet_id);
     void add_succ_tablet_id(int64_t tablet_id);
 
-    void notify();
-    void wait();
-
     int64_t finish_task();
 
 private:
-    std::atomic<int64_t> _total_task_num;
     const TPublishVersionRequest& _publish_version_req;
     std::mutex _tablet_ids_mutex;
     vector<TTabletId>* _error_tablet_ids;
     vector<TTabletId>* _succ_tablet_ids;
-
-    std::mutex _tablet_finish_mutex;
-    std::condition_variable _tablet_finish_cond;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to