This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d0d9c4267dd [fix](move-memtable) fix use-after-free in load stream
stub (#25618)
d0d9c4267dd is described below
commit d0d9c4267dd4556e698ca3e42c537d3228614e0e
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Oct 19 16:11:35 2023 +0800
[fix](move-memtable) fix use-after-free in load stream stub (#25618)
---
be/src/vec/sink/load_stream_stub.cpp | 24 ++++++------
be/src/vec/sink/load_stream_stub.h | 71 ++++++++++++++++++++++--------------
be/src/vec/sink/vtablet_sink_v2.cpp | 2 +-
3 files changed, 56 insertions(+), 41 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index c36756cc95d..821c46dc433 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -35,15 +35,15 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
Status st = Status::create(response.status());
std::stringstream ss;
- ss << "received response from backend " << _stub->_dst_id;
+ ss << "received response from backend " << _dst_id;
if (response.success_tablet_ids_size() > 0) {
ss << ", success tablet ids:";
for (auto tablet_id : response.success_tablet_ids()) {
ss << " " << tablet_id;
}
- std::lock_guard<bthread::Mutex>
lock(_stub->_success_tablets_mutex);
+ std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
for (auto tablet_id : response.success_tablet_ids()) {
- _stub->_success_tablets.push_back(tablet_id);
+ _success_tablets.push_back(tablet_id);
}
}
if (response.failed_tablet_ids_size() > 0) {
@@ -51,9 +51,9 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
for (auto tablet_id : response.failed_tablet_ids()) {
ss << " " << tablet_id;
}
- std::lock_guard<bthread::Mutex> lock(_stub->_failed_tablets_mutex);
+ std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
for (auto tablet_id : response.failed_tablet_ids()) {
- _stub->_failed_tablets.push_back(tablet_id);
+ _failed_tablets.push_back(tablet_id);
}
}
ss << ", status: " << st;
@@ -78,9 +78,9 @@ int
LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
}
void LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
- std::lock_guard<bthread::Mutex> lock(_stub->_mutex);
- _stub->_is_closed = true;
- _stub->_close_cv.notify_all();
+ std::lock_guard<bthread::Mutex> lock(_mutex);
+ _is_closed.store(true);
+ _close_cv.notify_all();
}
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
@@ -96,8 +96,7 @@ LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
LoadStreamStub::~LoadStreamStub() {
- std::unique_lock<bthread::Mutex> lock(_mutex);
- if (_is_init && !_is_closed) {
+ if (_is_init.load() && !_handler.is_closed()) {
brpc::StreamClose(_stream_id);
}
}
@@ -110,10 +109,11 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const std::vector<PTabletID>& tablets_for_schema,
bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
- if (_is_init) {
+ if (_is_init.load()) {
return Status::OK();
}
_dst_id = node_info.id;
+ _handler.set_dst_id(_dst_id);
std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = 20 << 20; // 20MB
@@ -152,7 +152,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
LOG(INFO) << "Opened stream " << _stream_id << " for backend " << _dst_id
<< " (" << host_port
<< ")";
- _is_init = true;
+ _is_init.store(true);
return Status::OK();
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 20cf5fc02ae..bd3ddec0d65 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -85,8 +85,6 @@ class LoadStreamStub {
private:
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
public:
- LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}
-
int on_received_messages(brpc::StreamId id, butil::IOBuf* const
messages[],
size_t size) override;
@@ -94,8 +92,44 @@ private:
void on_closed(brpc::StreamId id) override;
+ bool is_closed() { return _is_closed.load(); }
+
+ Status close_wait(int64_t timeout_ms) {
+ std::unique_lock<bthread::Mutex> lock(_mutex);
+ if (_is_closed) {
+ return Status::OK();
+ }
+ if (timeout_ms > 0) {
+ int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
+ return ret == 0 ? Status::OK()
+ : Status::Error<true>(ret, "stream close_wait
timeout");
+ }
+ _close_cv.wait(lock);
+ return Status::OK();
+ };
+
+ std::vector<int64_t> success_tablets() {
+ std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
+ return _success_tablets;
+ }
+
+ std::vector<int64_t> failed_tablets() {
+ std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
+ return _failed_tablets;
+ }
+
+ void set_dst_id(int64_t dst_id) { _dst_id = dst_id; }
+
private:
- LoadStreamStub* _stub;
+ int64_t _dst_id = -1; // for logging
+ std::atomic<bool> _is_closed;
+ bthread::Mutex _mutex;
+ bthread::ConditionVariable _close_cv;
+
+ bthread::Mutex _success_tablets_mutex;
+ bthread::Mutex _failed_tablets_mutex;
+ std::vector<int64_t> _success_tablets;
+ std::vector<int64_t> _failed_tablets;
};
public:
@@ -135,16 +169,10 @@ public:
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
Status close_wait(int64_t timeout_ms = 0) {
- std::unique_lock<bthread::Mutex> lock(_mutex);
- if (!_is_init || _is_closed) {
+ if (!_is_init.load() || _handler.is_closed()) {
return Status::OK();
}
- if (timeout_ms > 0) {
- int ret = _close_cv.wait_for(lock, timeout_ms * 1000);
- return ret == 0 ? Status::OK() : Status::Error<true>(ret, "stream
close_wait timeout");
- }
- _close_cv.wait(lock);
- return Status::OK();
+ return _handler.close_wait(timeout_ms);
}
std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
@@ -155,15 +183,9 @@ public:
return _enable_unique_mow_for_index->at(index_id);
}
- std::vector<int64_t> success_tablets() {
- std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
- return _success_tablets;
- }
+ std::vector<int64_t> success_tablets() { return
_handler.success_tablets(); }
- std::vector<int64_t> failed_tablets() {
- std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
- return _failed_tablets;
- }
+ std::vector<int64_t> failed_tablets() { return _handler.failed_tablets(); }
brpc::StreamId stream_id() const { return _stream_id; }
@@ -177,10 +199,8 @@ private:
Status _send_with_retry(butil::IOBuf& buf);
protected:
- bool _is_init = false;
- bool _is_closed = false;
+ std::atomic<bool> _is_init;
bthread::Mutex _mutex;
- bthread::ConditionVariable _close_cv;
std::atomic<int> _num_open;
@@ -192,12 +212,7 @@ protected:
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
int64_t _dst_id = -1; // destination backend_id
- LoadStreamReplyHandler _handler {this};
-
- bthread::Mutex _success_tablets_mutex;
- bthread::Mutex _failed_tablets_mutex;
- std::vector<int64_t> _success_tablets;
- std::vector<int64_t> _failed_tablets;
+ LoadStreamReplyHandler _handler;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 13d25f5cf46..d6d6d8f3351 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -394,7 +394,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status
exec_status) {
SCOPED_TIMER(_close_load_timer);
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : *streams) {
- static_cast<void>(stream->close_wait());
+ RETURN_IF_ERROR(stream->close_wait());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]