This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d9c4267dd [fix](move-memtable) fix use-after-free in load stream 
stub (#25618)
d0d9c4267dd is described below

commit d0d9c4267dd4556e698ca3e42c537d3228614e0e
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Oct 19 16:11:35 2023 +0800

    [fix](move-memtable) fix use-after-free in load stream stub (#25618)
---
 be/src/vec/sink/load_stream_stub.cpp | 24 ++++++------
 be/src/vec/sink/load_stream_stub.h   | 71 ++++++++++++++++++++++--------------
 be/src/vec/sink/vtablet_sink_v2.cpp  |  2 +-
 3 files changed, 56 insertions(+), 41 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index c36756cc95d..821c46dc433 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -35,15 +35,15 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
         Status st = Status::create(response.status());
 
         std::stringstream ss;
-        ss << "received response from backend " << _stub->_dst_id;
+        ss << "received response from backend " << _dst_id;
         if (response.success_tablet_ids_size() > 0) {
             ss << ", success tablet ids:";
             for (auto tablet_id : response.success_tablet_ids()) {
                 ss << " " << tablet_id;
             }
-            std::lock_guard<bthread::Mutex> 
lock(_stub->_success_tablets_mutex);
+            std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
             for (auto tablet_id : response.success_tablet_ids()) {
-                _stub->_success_tablets.push_back(tablet_id);
+                _success_tablets.push_back(tablet_id);
             }
         }
         if (response.failed_tablet_ids_size() > 0) {
@@ -51,9 +51,9 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
             for (auto tablet_id : response.failed_tablet_ids()) {
                 ss << " " << tablet_id;
             }
-            std::lock_guard<bthread::Mutex> lock(_stub->_failed_tablets_mutex);
+            std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
             for (auto tablet_id : response.failed_tablet_ids()) {
-                _stub->_failed_tablets.push_back(tablet_id);
+                _failed_tablets.push_back(tablet_id);
             }
         }
         ss << ", status: " << st;
@@ -78,9 +78,9 @@ int 
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
 }
 
 void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
-    std::lock_guard<bthread::Mutex> lock(_stub->_mutex);
-    _stub->_is_closed = true;
-    _stub->_close_cv.notify_all();
+    std::lock_guard<bthread::Mutex> lock(_mutex);
+    _is_closed.store(true);
+    _close_cv.notify_all();
 }
 
 LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
@@ -96,8 +96,7 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
           _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
 
 LoadStreamStub::~LoadStreamStub() {
-    std::unique_lock<bthread::Mutex> lock(_mutex);
-    if (_is_init && !_is_closed) {
+    if (_is_init.load() && !_handler.is_closed()) {
         brpc::StreamClose(_stream_id);
     }
 }
@@ -110,10 +109,11 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
                             const std::vector<PTabletID>& tablets_for_schema, 
bool enable_profile) {
     _num_open++;
     std::unique_lock<bthread::Mutex> lock(_mutex);
-    if (_is_init) {
+    if (_is_init.load()) {
         return Status::OK();
     }
     _dst_id = node_info.id;
+    _handler.set_dst_id(_dst_id);
     std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
     brpc::StreamOptions opt;
     opt.max_buf_size = 20 << 20; // 20MB
@@ -152,7 +152,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     }
     LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id 
<< " (" << host_port
               << ")";
-    _is_init = true;
+    _is_init.store(true);
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 20cf5fc02ae..bd3ddec0d65 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -85,8 +85,6 @@ class LoadStreamStub {
 private:
     class LoadStreamReplyHandler : public brpc::StreamInputHandler {
     public:
-        LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}
-
         int on_received_messages(brpc::StreamId id, butil::IOBuf* const 
messages[],
                                  size_t size) override;
 
@@ -94,8 +92,44 @@ private:
 
         void on_closed(brpc::StreamId id) override;
 
+        bool is_closed() { return _is_closed.load(); }
+
+        Status close_wait(int64_t timeout_ms) {
+            std::unique_lock<bthread::Mutex> lock(_mutex);
+            if (_is_closed) {
+                return Status::OK();
+            }
+            if (timeout_ms > 0) {
+                int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+                return ret == 0 ? Status::OK()
+                                : Status::Error<true>(ret, "stream close_wait 
timeout");
+            }
+            _close_cv.wait(lock);
+            return Status::OK();
+        };
+
+        std::vector<int64_t> success_tablets() {
+            std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+            return _success_tablets;
+        }
+
+        std::vector<int64_t> failed_tablets() {
+            std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+            return _failed_tablets;
+        }
+
+        void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
+
     private:
-        LoadStreamStub* _stub;
+        int64_t _dst_id = -1; // for logging
+        std::atomic<bool> _is_closed;
+        bthread::Mutex _mutex;
+        bthread::ConditionVariable _close_cv;
+
+        bthread::Mutex _success_tablets_mutex;
+        bthread::Mutex _failed_tablets_mutex;
+        std::vector<int64_t> _success_tablets;
+        std::vector<int64_t> _failed_tablets;
     };
 
 public:
@@ -135,16 +169,10 @@ public:
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
     Status close_wait(int64_t timeout_ms = 0) {
-        std::unique_lock<bthread::Mutex> lock(_mutex);
-        if (!_is_init || _is_closed) {
+        if (!_is_init.load() || _handler.is_closed()) {
             return Status::OK();
         }
-        if (timeout_ms > 0) {
-            int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
-            return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream 
close_wait timeout");
-        }
-        _close_cv.wait(lock);
-        return Status::OK();
+        return _handler.close_wait(timeout_ms);
     }
 
     std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
@@ -155,15 +183,9 @@ public:
         return _enable_unique_mow_for_index->at(index_id);
     }
 
-    std::vector<int64_t> success_tablets() {
-        std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
-        return _success_tablets;
-    }
+    std::vector<int64_t> success_tablets() { return 
_handler.success_tablets(); }
 
-    std::vector<int64_t> failed_tablets() {
-        std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
-        return _failed_tablets;
-    }
+    std::vector<int64_t> failed_tablets() { return _handler.failed_tablets(); }
 
     brpc::StreamId stream_id() const { return _stream_id; }
 
@@ -177,10 +199,8 @@ private:
     Status _send_with_retry(butil::IOBuf& buf);
 
 protected:
-    bool _is_init = false;
-    bool _is_closed = false;
+    std::atomic<bool> _is_init;
     bthread::Mutex _mutex;
-    bthread::ConditionVariable _close_cv;
 
     std::atomic<int> _num_open;
 
@@ -192,12 +212,7 @@ protected:
     brpc::StreamId _stream_id;
     int64_t _src_id = -1; // source backend_id
     int64_t _dst_id = -1; // destination backend_id
-    LoadStreamReplyHandler _handler {this};
-
-    bthread::Mutex _success_tablets_mutex;
-    bthread::Mutex _failed_tablets_mutex;
-    std::vector<int64_t> _success_tablets;
-    std::vector<int64_t> _failed_tablets;
+    LoadStreamReplyHandler _handler;
 
     std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 13d25f5cf46..d6d6d8f3351 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -394,7 +394,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status 
exec_status) {
             SCOPED_TIMER(_close_load_timer);
             for (const auto& [_, streams] : _streams_for_node) {
                 for (const auto& stream : *streams) {
-                    static_cast<void>(stream->close_wait());
+                    RETURN_IF_ERROR(stream->close_wait());
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to