This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 1337db03 Fix batch create stream and make SetHostSocket thread safe
(#2938)
1337db03 is described below
commit 1337db0331037240de6424a1f6373ceb7235bd13
Author: Jenrry You <[email protected]>
AuthorDate: Fri Apr 18 16:30:56 2025 +0800
Fix batch create stream and make SetHostSocket thread safe (#2938)
---
src/brpc/controller.cpp | 4 ++--
src/brpc/stream.cpp | 25 ++++++++++---------------
src/brpc/stream_impl.h | 3 ++-
3 files changed, 14 insertions(+), 18 deletions(-)
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 0cb83dc5..1362d322 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -1439,12 +1439,12 @@ void Controller::HandleStreamConnection(Socket
*host_socket) {
Stream* s = (Stream*)ptrs[0]->conn();
s->SetConnected(_remote_stream_settings);
if (stream_num > 1) {
- const auto& extra_stream_ids =
_remote_stream_settings->extra_stream_ids();
+ auto extra_stream_ids =
std::move(*_remote_stream_settings->mutable_extra_stream_ids());
_remote_stream_settings->clear_extra_stream_ids();
for (size_t i = 1; i < stream_num; ++i) {
Stream* extra_stream = (Stream *) ptrs[i]->conn();
_remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
- s->ShareHostSocket(*extra_stream);
+ s->SetHostSocket(host_socket);
extra_stream->SetConnected(_remote_stream_settings);
}
}
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 68397b57..2a443054 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -640,17 +640,16 @@ void Stream::SendFeedback() {
}
int Stream::SetHostSocket(Socket *host_socket) {
- if (_host_socket != NULL) {
- CHECK(false) << "SetHostSocket has already been called";
- return -1;
- }
- SocketUniquePtr ptr;
- host_socket->ReAddress(&ptr);
- // TODO add *this to host socke
- if (ptr->AddStream(id()) != 0) {
- return -1;
- }
- _host_socket = ptr.release();
+ std::call_once(_set_host_socket_flag, [this, host_socket]() {
+ SocketUniquePtr ptr;
+ host_socket->ReAddress(&ptr);
+ // TODO add *this to host socke
+ if (ptr->AddStream(id()) != 0) {
+ CHECK(false) << id() << " fail to add stream to host socket";
+ return;
+ }
+ _host_socket = ptr.release();
+ });
return 0;
}
@@ -710,10 +709,6 @@ void Stream::Close(int error_code, const char* reason_fmt,
...) {
return TriggerOnConnectIfNeed();
}
-int Stream::ShareHostSocket(Stream& other_stream) {
- return other_stream.SetHostSocket(_host_socket);
-}
-
int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt,
...) {
SocketUniquePtr ptr;
if (Socket::AddressFailedAsWell(id, &ptr) == -1) {
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index 66e0d719..5ff7cb04 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -19,6 +19,7 @@
#ifndef BRPC_STREAM_IMPL_H
#define BRPC_STREAM_IMPL_H
+#include <mutex>
#include "bthread/bthread.h"
#include "bthread/execution_queue.h"
#include "brpc/socket.h"
@@ -67,7 +68,6 @@ public:
__attribute__ ((__format__ (__printf__, 3, 4)));
void Close(int error_code, const char* reason_fmt, ...)
__attribute__ ((__format__ (__printf__, 3, 4)));
- int ShareHostSocket(Stream& other_stream);
private:
friend void StreamWait(StreamId stream_id, const timespec *due_time,
@@ -134,6 +134,7 @@ friend struct butil::DefaultDeleter<Stream>;
butil::IOBuf *_pending_buf;
int64_t _start_idle_timer_us;
bthread_timer_t _idle_timer;
+ std::once_flag _set_host_socket_flag;
};
} // namespace brpc
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]