This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 15469ed3 Support connect on socket create (#2574)
15469ed3 is described below
commit 15469ed314f22e4cc598e0f0abdde64463e0b768
Author: Bright Chen <[email protected]>
AuthorDate: Thu Jun 13 12:23:28 2024 +0800
Support connect on socket create (#2574)
---
src/brpc/rtmp.cpp | 2 +-
src/brpc/selective_channel.cpp | 20 +++++---
src/brpc/socket.cpp | 65 +++++++++++++++++++------
src/brpc/socket.h | 27 ++++++++---
src/brpc/socket_inl.h | 2 +
src/brpc/socket_map.cpp | 17 ++++---
src/brpc/versioned_ref_with_id.h | 5 ++
test/brpc_socket_unittest.cpp | 21 ++++----
test/brpc_ssl_unittest.cpp | 102 ++++++++++++++++++++++++++++++++++++---
9 files changed, 207 insertions(+), 54 deletions(-)
diff --git a/src/brpc/rtmp.cpp b/src/brpc/rtmp.cpp
index ae6eb6ad..4913881c 100644
--- a/src/brpc/rtmp.cpp
+++ b/src/brpc/rtmp.cpp
@@ -1087,7 +1087,7 @@ public:
: _connect_options(connect_options) {
}
- int CreateSocket(const SocketOptions& opt, SocketId* id) {
+ int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.app_connect = std::make_shared<RtmpConnect>();
sock_opt.initial_parsing_context = new
policy::RtmpContext(&_connect_options, NULL);
diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index 9ad5f9a0..5a815821 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -158,8 +158,8 @@ private:
ChannelBalancer::~ChannelBalancer() {
for (ChannelToIdMap::iterator
it = _chan_map.begin(); it != _chan_map.end(); ++it) {
- SocketUniquePtr ptr(it->second); // Dereference
it->second->ReleaseAdditionalReference();
+ it->second->ReleaseHCRelatedReference();
}
_chan_map.clear();
}
@@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
return -1;
}
SocketUniquePtr ptr;
- CHECK_EQ(0, Socket::Address(sock_id, &ptr));
+ int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
+ if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
+ LOG(FATAL) << "Fail to address SocketId=" << sock_id;
+ return -1;
+ }
if (!AddServer(ServerId(sock_id))) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
// sub_chan will be deleted when the socket is recycled.
ptr->SetFailed();
+ // Cancel health checking.
+ ptr->ReleaseHCRelatedReference();
return -1;
}
- ptr->SetHCRelatedRefHeld(); // set held status
- _chan_map[sub_channel]= ptr.release(); // Add reference.
+ // The health-check-related reference has been held on created.
+ _chan_map[sub_channel]= ptr.get();
if (handle) {
*handle = sock_id;
}
@@ -223,13 +229,11 @@ void
ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
BAIDU_SCOPED_LOCK(_mutex);
CHECK_EQ(1UL, _chan_map.erase(sub->chan));
}
- {
- ptr->SetHCRelatedRefReleased(); // set released status to cancel
health checking
- SocketUniquePtr ptr2(ptr.get()); // Dereference.
- }
if (rc == 0) {
ptr->ReleaseAdditionalReference();
}
+ // Cancel health checking.
+ ptr->ReleaseHCRelatedReference();
}
}
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 85aa1507..ac1c37ae 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -577,6 +577,10 @@ int Socket::ResetFileDescriptor(int fd) {
if (!ValidFileDescriptor(fd)) {
return 0;
}
+ if (_remote_side == butil::EndPoint()) {
+ // OK to fail, non-socket fd does not support this.
+ butil::get_remote_side(fd, &_remote_side);
+ }
// OK to fail, non-socket fd does not support this.
if (butil::get_local_side(fd, &_local_side) != 0) {
_local_side = butil::EndPoint();
@@ -781,6 +785,19 @@ int Socket::OnCreated(const SocketOptions& options) {
_keepalive_options = options.keepalive_options;
CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
_is_write_shutdown = false;
+ int fd = options.fd;
+ if (!ValidFileDescriptor(fd) && options.connect_on_create) {
+ // Connect on create.
+ fd = DoConnect(options.connect_abstime, NULL, NULL);
+ if (fd < 0) {
+ PLOG(ERROR) << "Fail to connect to " << options.remote_side;
+ int error_code = errno != 0 ? errno : EHOSTDOWN;
+ SetFailed(error_code, "Fail to connect to %s: %s",
+ butil::endpoint2str(options.remote_side).c_str(),
+ berror(error_code));
+ return -1;
+ }
+ }
// Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
if (ResetFileDescriptor(options.fd) != 0) {
@@ -790,6 +807,7 @@ int Socket::OnCreated(const SocketOptions& options) {
berror(saved_errno));
return -1;
}
+ HoldHCRelatedRef();
guard.dismiss();
return 0;
@@ -940,6 +958,20 @@ std::string Socket::OnDescription() const {
return result;
}
+void Socket::HoldHCRelatedRef() {
+ if (_health_check_interval_s > 0) {
+ _is_hc_related_ref_held = true;
+ AddReference();
+ }
+}
+
+void Socket::ReleaseHCRelatedReference() {
+ if (_health_check_interval_s > 0) {
+ _is_hc_related_ref_held = false;
+ Dereference();
+ }
+}
+
int Socket::WaitAndReset(int32_t expected_nref) {
const uint32_t id_ver = VersionOfVRefId(id());
uint64_t vref;
@@ -1350,16 +1382,27 @@ int Socket::CheckConnected(int sockfd) {
return -1;
}
- butil::EndPoint local_point;
- CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
- LOG_IF(INFO, FLAGS_log_connected)
- << "Connected to " << remote_side()
- << " via fd=" << (int)sockfd << " SocketId=" << id()
- << " local_side=" << local_point;
+ if (FLAGS_log_connected) {
+ butil::EndPoint local_point;
+ CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
+ LOG(INFO) << "Connected to " << remote_side()
+ << " via fd=" << (int)sockfd << " SocketId=" << id()
+ << " local_side=" << local_point;
+ }
+
// Doing SSL handshake after TCP connected
return SSLHandshake(sockfd, false);
}
+int Socket::DoConnect(const timespec* abstime,
+ int (*on_connect)(int, int, void*), void* data) {
+ if (_conn) {
+ return _conn->Connect(this, abstime, on_connect, data);
+ } else {
+ return Connect(abstime, on_connect, data);
+ }
+}
+
int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
if (_fd.load(butil::memory_order_consume) >= 0) {
return 0;
@@ -1370,14 +1413,8 @@ int Socket::ConnectIfNot(const timespec* abstime,
WriteRequest* req) {
SocketUniquePtr s;
ReAddress(&s);
req->set_socket(s.get());
- if (_conn) {
- if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) {
- return -1;
- }
- } else {
- if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
- return -1;
- }
+ if (DoConnect(abstime, KeepWriteIfConnected, req) < 0) {
+ return -1;
}
s.release();
return 1;
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index 6d9c8f11..2a6ab748 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -256,6 +256,14 @@ struct SocketOptions {
// user->BeforeRecycle() before recycling.
int fd;
butil::EndPoint remote_side;
+ // If `connect_on_create' is true and `fd' is less than 0,
+ // a client connection will be established to remote_side()
+ // regarding deadline `connect_abstime' when Socket is being created.
+ // Default: false, means that a connection will be established
+ // on first write.
+ bool connect_on_create;
+ // Default: NULL, means no timeout.
+ const timespec* connect_abstime;
SocketUser* user;
// When *edge-triggered* events happen on the file descriptor, callback
// `on_edge_triggered_events' will be called. Inside the callback, user
@@ -409,16 +417,15 @@ public:
// True if health checking is enabled.
bool HCEnabled() const {
+ // This fence makes sure that we see change of
+ // `_is_hc_related_ref_held' before changing `_versioned_ref.
+ butil::atomic_thread_fence(butil::memory_order_acquire);
return _health_check_interval_s > 0 && _is_hc_related_ref_held;
}
- // When someone holds a health-checking-related reference,
- // this function need to be called to make health checking run normally.
- void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
- // When someone releases the health-checking-related reference,
- // this function need to be called to cancel health checking.
- void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
- bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }
+ // Release the health-checking-related
+ // reference which is held on created.
+ void ReleaseHCRelatedReference();
// After health checking is complete, set _hc_started to false.
void AfterHCCompleted() { _hc_started.store(false,
butil::memory_order_relaxed); }
@@ -665,6 +672,9 @@ private:
std::string OnDescription() const;
+ // Hold the health-checking-related
+ // reference on created.
+ void HoldHCRelatedRef();
static int Status(SocketId, int32_t* nref = NULL); // for unit-test.
@@ -699,8 +709,11 @@ private:
// starting a connection request and `on_connect' will be called
// when connecting completes (whether it succeeds or not)
// Returns the socket fd on success, -1 otherwise
+ int DoConnect(const timespec* abstime,
+ int (*on_connect)(int fd, int err, void* data), void* data);
int Connect(const timespec* abstime,
int (*on_connect)(int fd, int err, void* data), void* data);
+
int CheckConnected(int sockfd);
// [Not thread-safe] Only used by `Write'.
diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h
index a8ff3ce8..d704b900 100644
--- a/src/brpc/socket_inl.h
+++ b/src/brpc/socket_inl.h
@@ -25,6 +25,8 @@ namespace brpc {
inline SocketOptions::SocketOptions()
: fd(-1)
+ , connect_on_create(false)
+ , connect_abstime(NULL)
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp
index 774bf5a7..609d2776 100644
--- a/src/brpc/socket_map.cpp
+++ b/src/brpc/socket_map.cpp
@@ -58,7 +58,7 @@ static butil::static_atomic<SocketMap*> g_socket_map =
BUTIL_STATIC_ATOMIC_INIT(
class GlobalSocketCreator : public SocketCreator {
public:
- int CreateSocket(const SocketOptions& opt, SocketId* id) {
+ int CreateSocket(const SocketOptions& opt, SocketId* id) override {
SocketOptions sock_opt = opt;
sock_opt.health_check_interval_s = FLAGS_health_check_interval;
return get_client_side_messenger()->Create(sock_opt, id);
@@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
return 0;
}
// A socket w/o HC is failed (permanently), replace it.
- sc->socket->SetHCRelatedRefReleased(); // set released status to
cancel health checking
- SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
+ sc->socket->ReleaseHCRelatedReference();
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
@@ -258,12 +257,15 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId*
id,
// use SocketUniquePtr which cannot put into containers before c++11.
// The ref will be removed at entry's removal.
SocketUniquePtr ptr;
- if (Socket::Address(tmp_id, &ptr) != 0) {
+ int rc = Socket::AddressFailedAsWell(tmp_id, &ptr);
+ if (rc < 0) {
LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
return -1;
+ } else if (rc > 0 && !ptr->HCEnabled()) {
+ LOG(FATAL) << "Failed socket is not HC-enabled";
+ return -1;
}
- ptr->SetHCRelatedRefHeld(); // set held status
- SingleConnection new_sc = { 1, ptr.release(), 0 };
+ SingleConnection new_sc = { 1, ptr.get(), 0 };
_map[key] = new_sc;
*id = tmp_id;
mu.unlock();
@@ -301,8 +303,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
_map.erase(key);
mu.unlock();
s->ReleaseAdditionalReference(); // release extra ref
- s->SetHCRelatedRefReleased(); // set released status to cancel
health checking
- SocketUniquePtr ptr(s); // Dereference
+ s->ReleaseHCRelatedReference();
}
}
}
diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h
index 38141f3d..c78f019b 100644
--- a/src/brpc/versioned_ref_with_id.h
+++ b/src/brpc/versioned_ref_with_id.h
@@ -296,6 +296,11 @@ friend void DereferenceVersionedRefWithId<>(T* r);
// it will be recycled automatically and T::BeforeRecycled() will be
called.
int Dereference();
+ // Increase the reference count by 1.
+ void AddReference() {
+ _versioned_ref.fetch_add(1, butil::memory_order_release);
+ }
+
// Make this socket addressable again.
// If nref is less than `at_least_nref', VersionedRefWithId was
// abandoned during revival and cannot be revived.
diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp
index f278c46b..e83eef7e 100644
--- a/test/brpc_socket_unittest.cpp
+++ b/test/brpc_socket_unittest.cpp
@@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
{
brpc::SocketUniquePtr s;
ASSERT_EQ(0, brpc::Socket::Address(id, &s));
- s->SetHCRelatedRefHeld(); // set held status
global_sock = s.get();
ASSERT_TRUE(s.get());
ASSERT_EQ(-1, s->fd());
@@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
#endif
ASSERT_TRUE(src.empty());
ASSERT_EQ(-1, s->fd());
+ s->ReleaseHCRelatedReference();
}
// StartHealthCheck is possibly still running. Spin until global_sock
// is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
@@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) {
options.user = new CheckRecycle;
options.health_check_interval_s = kCheckInteval/*s*/;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
- brpc::SocketUniquePtr s;
- ASSERT_EQ(0, brpc::Socket::Address(id, &s));
-
- s->SetHCRelatedRefHeld(); // set held status
- global_sock = s.get();
- ASSERT_TRUE(s.get());
+ brpc::Socket* s = NULL;
+ {
+ brpc::SocketUniquePtr ptr;
+ ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+ s = ptr.get();
+ }
+ global_sock = s;
+ ASSERT_NE(nullptr, s);
ASSERT_EQ(-1, s->fd());
ASSERT_EQ(point, s->remote_side());
ASSERT_EQ(id, s->id());
@@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) {
ASSERT_NE(0, ptr->fd());
}
- s.release()->Dereference();
+ s->ReleaseHCRelatedReference();
// Must stop messenger before SetFailed the id otherwise StartHealthCheck
// still has chance to get reconnected and revive the id.
@@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) {
bthread_usleep(1000);
ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
}
- ASSERT_EQ(-1, brpc::Socket::Status(id));
+ nref = 0;
+ ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref;
// The id is invalid.
brpc::SocketUniquePtr ptr;
ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
diff --git a/test/brpc_ssl_unittest.cpp b/test/brpc_ssl_unittest.cpp
index e101f534..3c64ba76 100644
--- a/test/brpc_ssl_unittest.cpp
+++ b/test/brpc_ssl_unittest.cpp
@@ -26,6 +26,9 @@
#include <butil/macros.h>
#include <butil/fd_guard.h>
#include <butil/files/scoped_file.h>
+#include <brpc/policy/baidu_rpc_meta.pb.h>
+#include <brpc/policy/baidu_rpc_protocol.h>
+#include <brpc/policy/most_common_message.h>
#include "brpc/global.h"
#include "brpc/socket.h"
#include "brpc/server.h"
@@ -54,11 +57,11 @@ const std::string EXP_RESPONSE = "world";
class EchoServiceImpl : public test::EchoService {
public:
EchoServiceImpl() : count(0) {}
- virtual ~EchoServiceImpl() { g_delete = true; }
- virtual void Echo(google::protobuf::RpcController* cntl_base,
- const test::EchoRequest* request,
- test::EchoResponse* response,
- google::protobuf::Closure* done) {
+ ~EchoServiceImpl() override { g_delete = true; }
+ void Echo(google::protobuf::RpcController* cntl_base,
+ const test::EchoRequest* request,
+ test::EchoResponse* response,
+ google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = (brpc::Controller*)cntl_base;
count.fetch_add(1, butil::memory_order_relaxed);
@@ -207,7 +210,7 @@ TEST_F(SSLTest, force_ssl) {
test::EchoService_Stub stub(&channel);
test::EchoResponse res;
stub.Echo(&cntl, &req, &res, NULL);
- EXPECT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText();
+ ASSERT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText();
}
{
@@ -218,13 +221,98 @@ TEST_F(SSLTest, force_ssl) {
test::EchoService_Stub stub(&channel);
test::EchoResponse res;
stub.Echo(&cntl, &req, &res, NULL);
- EXPECT_TRUE(cntl.Failed());
+ ASSERT_TRUE(cntl.Failed());
}
ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}
+void ProcessResponse(brpc::InputMessageBase* msg_base) {
+ brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg(
+ static_cast<brpc::policy::MostCommonMessage*>(msg_base));
+ brpc::policy::RpcMeta meta;
+ ASSERT_TRUE(brpc::ParsePbFromIOBuf(&meta, msg->meta));
+ const brpc::policy::RpcResponseMeta &response_meta = meta.response();
+ ASSERT_EQ(0, response_meta.error_code()) << response_meta.error_text();
+
+ const brpc::CallId cid = { static_cast<uint64_t>(meta.correlation_id()) };
+ brpc::Controller* cntl = NULL;
+ ASSERT_EQ(0, bthread_id_lock(cid, (void**)&cntl));
+ ASSERT_NE(nullptr, cntl);
+ ASSERT_TRUE(brpc::ParsePbFromIOBuf(cntl->response(), msg->payload));
+ ASSERT_EQ(0, bthread_id_unlock_and_destroy(cid));
+}
+
+TEST_F(SSLTest, connect_on_create) {
+ brpc::Protocol dummy_protocol = {
+ brpc::policy::ParseRpcMessage, brpc::SerializeRequestDefault,
+ brpc::policy::PackRpcRequest,NULL, ProcessResponse,
+ NULL, NULL, NULL, brpc::CONNECTION_TYPE_ALL, "ssl_ut_baidu"
+ };
+ ASSERT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
+
+ brpc::InputMessageHandler dummy_handler ={
+ dummy_protocol.parse, dummy_protocol.process_response,
+ NULL, NULL, dummy_protocol.name
+ };
+ brpc::InputMessenger messenger;
+ ASSERT_EQ(0, messenger.AddHandler(dummy_handler));
+
+ const int port = 8613;
+ brpc::Server server;
+ brpc::ServerOptions server_options;
+ server_options.force_ssl = true;
+
+ brpc::CertInfo cert;
+ cert.certificate = "cert1.crt";
+ cert.private_key = "cert1.key";
+ server_options.mutable_ssl_options()->default_cert = cert;
+
+ EchoServiceImpl echo_svc;
+ ASSERT_EQ(0, server.AddService(
+ &echo_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
+ ASSERT_EQ(0, server.Start(port, &server_options));
+
+ // Create client socket.
+ brpc::SocketOptions socket_options;
+ butil::EndPoint ep(butil::IP_ANY, port);
+ socket_options.remote_side = ep;
+ socket_options.connect_on_create = true;
+ socket_options.on_edge_triggered_events =
brpc::InputMessenger::OnNewMessages;
+ socket_options.user = &messenger;
+ brpc::ChannelSSLOptions ssl_options;
+ SSL_CTX* raw_ctx = brpc::CreateClientSSLContext(ssl_options);
+ ASSERT_NE(nullptr, raw_ctx);
+ std::shared_ptr<brpc::SocketSSLContext> ssl_ctx
+ = std::make_shared<brpc::SocketSSLContext>();
+ ssl_ctx->raw_ctx = raw_ctx;
+ socket_options.initial_ssl_ctx = ssl_ctx;
+
+ brpc::SocketId socket_id;
+ ASSERT_EQ(0, brpc::Socket::Create(socket_options, &socket_id));
+ brpc::SocketUniquePtr ptr;
+ ASSERT_EQ(0, brpc::Socket::Address(socket_id, &ptr));
+
+ test::EchoRequest req;
+ req.set_message(EXP_REQUEST);
+ for (int i = 0; i < 100; ++i) {
+ test::EchoResponse res;
+ butil::IOBuf request_buf;
+ butil::IOBuf request_body;
+ brpc::Controller cntl;
+ cntl._response = &res;
+ const brpc::CallId correlation_id = cntl.call_id();
+ brpc::SerializeRequestDefault(&request_body, &cntl, &req);
+ brpc::policy::PackRpcRequest(&request_buf, NULL, correlation_id.value,
+
test::EchoService_Stub::descriptor()->method(0),
+ &cntl, request_body, NULL);
+ ASSERT_EQ(0, ptr->Write(&request_buf));
+ brpc::Join(correlation_id);
+ ASSERT_EQ(EXP_RESPONSE, res.message());
+ }
+}
+
void CheckCert(const char* cname, const char* cert) {
const int port = 8613;
brpc::Channel channel;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]