This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 0c4f2a89 Fix some unstable UTs (#2928)
0c4f2a89 is described below
commit 0c4f2a897cf159cb900ac4a86636fde0b72395a2
Author: Bright Chen <[email protected]>
AuthorDate: Fri Mar 28 23:13:42 2025 +0800
Fix some unstable UTs (#2928)
---
src/brpc/socket.cpp | 2 +-
test/brpc_builtin_service_unittest.cpp | 17 +++++-
test/brpc_redis_unittest.cpp | 76 ++++++++---------------
test/brpc_socket_unittest.cpp | 107 ++++++++++++++++++++++-----------
test/brpc_streaming_rpc_unittest.cpp | 28 ++++-----
test/bthread_fd_unittest.cpp | 2 +-
test/bthread_unittest.cpp | 107 +++++++++++++++++++++------------
test/endpoint_unittest.cpp | 4 +-
8 files changed, 194 insertions(+), 149 deletions(-)
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 5e969fc8..e7cc336a 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -812,7 +812,7 @@ int Socket::OnCreated(const SocketOptions& options) {
}
// Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
- if (ResetFileDescriptor(options.fd) != 0) {
+ if (ResetFileDescriptor(fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
diff --git a/test/brpc_builtin_service_unittest.cpp
b/test/brpc_builtin_service_unittest.cpp
index 638e5d4c..76b99811 100644
--- a/test/brpc_builtin_service_unittest.cpp
+++ b/test/brpc_builtin_service_unittest.cpp
@@ -841,8 +841,10 @@ void* dummy_bthread(void*) {
#ifdef BRPC_BTHREAD_TRACER
+bool g_bthread_trace_start = false;
bool g_bthread_trace_stop = false;
void* bthread_trace(void*) {
+ g_bthread_trace_start = true;
while (!g_bthread_trace_stop) {
bthread_usleep(1000 * 100);
}
@@ -883,9 +885,13 @@ TEST_F(BuiltinServiceTest, bthreads) {
}
#ifdef BRPC_BTHREAD_TRACER
- {
+ bool ok = false;
+ for (int i = 0; i < 10; ++i) {
bthread_t th;
EXPECT_EQ(0, bthread_start_background(&th, NULL, bthread_trace, NULL));
+ while (!g_bthread_trace_start) {
+ bthread_usleep(1000 * 10);
+ }
ClosureChecker done;
brpc::Controller cntl;
std::string id_string;
@@ -895,9 +901,14 @@ TEST_F(BuiltinServiceTest, bthreads) {
service.default_method(&cntl, &req, &res, &done);
g_bthread_trace_stop = true;
EXPECT_FALSE(cntl.Failed());
- CheckContent(cntl, "stop=0");
- CheckContent(cntl, "bthread_trace");
+ const std::string& content = cntl.response_attachment().to_string();
+ ok = content.find("stop=0") != std::string::npos &&
+ content.find("bthread_trace") != std::string::npos;
+ if (ok) {
+ break;
+ }
}
+ ASSERT_TRUE(ok);
#endif // BRPC_BTHREAD_TRACER
}
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 017d5c7e..7a48d19a 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -814,10 +814,9 @@ std::unordered_map<std::string, int64_t> int_map;
class RedisServiceImpl : public brpc::RedisService {
public:
- RedisServiceImpl()
+ RedisServiceImpl(std::string password)
: _batch_count(0)
- , _user("user1")
- , _password("password1") {}
+ , _password(std::move(password)) {}
brpc::RedisCommandHandlerResult OnBatched(const
std::vector<butil::StringPiece>& args,
brpc::RedisReply* output, bool flush_batched) {
@@ -867,21 +866,19 @@ public:
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
- std::string _user;
std::string _password;
};
class AuthSession : public brpc::Destroyable {
public:
- explicit AuthSession(const std::string& user_name, const std::string&
password)
- : _user_name(user_name), _password(password) {}
+ explicit AuthSession(std::string password)
+ : _password(std::move(password)) {}
void Destroy() override {
delete this;
}
- const std::string _user_name;
const std::string _password;
};
@@ -894,17 +891,16 @@ public:
const std::vector<butil::StringPiece>&
args,
brpc::RedisReply* output,
bool flush_batched) {
- if (args.size() < 2) {
+ if (args.size() < 1) {
output->SetError("ERR wrong number of arguments for 'AUTH'
command");
return brpc::REDIS_CMD_HANDLED;
}
- const std::string user(args[1].data(), args[1].size());
- const std::string password(args[2].data(), args[2].size());
- if (_rs->_user != user || _rs->_password != password) {
+ const std::string password(args[1].data(), args[1].size());
+ if (_rs->_password != password) {
output->SetError("ERR invalid username/password");
return brpc::REDIS_CMD_HANDLED;
}
- auto auth_session = new AuthSession(user, password);
+ auto auth_session = new AuthSession(password);
ctx->reset_session(auth_session);
output->SetStatus("OK");
return brpc::REDIS_CMD_HANDLED;
@@ -929,7 +925,7 @@ public:
return brpc::REDIS_CMD_HANDLED;
}
AuthSession* session = static_cast<AuthSession*>(ctx->session);
- if (!session || (session->_password != _rs->_password) ||
(session->_user_name != _rs->_user)) {
+ if (!session || (session->_password != _rs->_password)) {
output->SetError("ERR no auth");
return brpc::REDIS_CMD_HANDLED;
}
@@ -971,7 +967,7 @@ public:
return brpc::REDIS_CMD_HANDLED;
}
AuthSession* session = static_cast<AuthSession*>(ctx->session);
- if (!session || (session->_password != _rs->_password) ||
(session->_user_name != _rs->_user)) {
+ if (session->_password != _rs->_password) {
output->SetError("ERR no auth");
return brpc::REDIS_CMD_HANDLED;
}
@@ -1015,7 +1011,7 @@ public:
return brpc::REDIS_CMD_HANDLED;
}
AuthSession* session = static_cast<AuthSession*>(ctx->session);
- if (!session || (session->_password != _rs->_password) ||
(session->_user_name != _rs->_user)) {
+ if (session->_password != _rs->_password) {
output->SetError("ERR no auth");
return brpc::REDIS_CMD_HANDLED;
}
@@ -1036,9 +1032,10 @@ private:
};
TEST_F(RedisTest, server_sanity) {
+ std::string password = GeneratePassword();
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
GetCommandHandler *gh = new GetCommandHandler(rsimpl);
SetCommandHandler *sh = new SetCommandHandler(rsimpl);
AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
@@ -1053,21 +1050,13 @@ TEST_F(RedisTest, server_sanity) {
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
+ options.auth = new brpc::policy::RedisAuthenticator(password);
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port,
&options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("auth user1 password1"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
- ASSERT_STREQ("OK", response.reply(0).c_str());
- request.Clear();
- response.Clear();
- cntl.Reset();
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello2"));
ASSERT_TRUE(request.AddCommand("set key1 value1"));
@@ -1122,13 +1111,6 @@ TEST_F(RedisTest, server_sanity) {
void* incr_thread(void* arg) {
brpc::Channel* c = static_cast<brpc::Channel*>(arg);
- // do auth
- brpc::RedisRequest auth_req;
- brpc::RedisResponse auth_resp;
- brpc::Controller auth_cntl;
- EXPECT_TRUE(auth_req.AddCommand("auth user1 password1"));
- c->CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL);
- EXPECT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText();
for (int i = 0; i < 5000; ++i) {
brpc::RedisRequest request;
brpc::RedisResponse response;
@@ -1137,16 +1119,17 @@ void* incr_thread(void* arg) {
c->CallMethod(NULL, &cntl, &request, &response, NULL);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
EXPECT_EQ(1, response.reply_size());
- EXPECT_TRUE(response.reply(0).is_integer());
+ EXPECT_TRUE(response.reply(0).is_integer()) << response.reply(0);
}
return NULL;
}
TEST_F(RedisTest, server_concurrency) {
+ std::string password = GeneratePassword();
int N = 10;
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
IncrCommandHandler *ih = new IncrCommandHandler(rsimpl);
rsimpl->AddCommandHandler("incr", ih);
@@ -1158,6 +1141,7 @@ TEST_F(RedisTest, server_concurrency) {
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.connection_type = "pooled";
+ options.auth = new brpc::policy::RedisAuthenticator(password);
std::vector<bthread_t> bths;
std::vector<brpc::Channel*> channels;
for (int i = 0; i < N; ++i) {
@@ -1228,9 +1212,10 @@ public:
};
TEST_F(RedisTest, server_command_continue) {
+ std::string password = GeneratePassword();
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
rsimpl->AddCommandHandler("auth", new AuthCommandHandler(rsimpl));
rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
@@ -1242,16 +1227,9 @@ TEST_F(RedisTest, server_command_continue) {
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
+ options.auth = new brpc::policy::RedisAuthenticator(password);
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port,
&options));
- // do auth
- brpc::RedisRequest auth_req;
- brpc::RedisResponse auth_resp;
- brpc::Controller auth_cntl;
- ASSERT_TRUE(auth_req.AddCommand("auth user1 password1"));
- channel.CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL);
- ASSERT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText();
-
{
brpc::RedisRequest request;
brpc::RedisResponse response;
@@ -1311,9 +1289,10 @@ TEST_F(RedisTest, server_command_continue) {
}
TEST_F(RedisTest, server_handle_pipeline) {
+ std::string password = GeneratePassword();
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl;
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
AuthCommandHandler* authch = new AuthCommandHandler(rsimpl);
@@ -1327,20 +1306,13 @@ TEST_F(RedisTest, server_handle_pipeline) {
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
+ options.auth = new brpc::policy::RedisAuthenticator(password);
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port,
&options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
- ASSERT_TRUE(request.AddCommand("auth user1 password1"));
- channel.CallMethod(NULL, &cntl, &request, &response, NULL);
- ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
- ASSERT_EQ(1, response.reply_size());
- ASSERT_STREQ("OK", response.reply(0).c_str());
- request.Clear();
- response.Clear();
- cntl.Reset();
ASSERT_TRUE(request.AddCommand("set key1 v1"));
ASSERT_TRUE(request.AddCommand("set key2 v2"));
ASSERT_TRUE(request.AddCommand("set key3 v3"));
diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp
index 78b83503..0f35863f 100644
--- a/test/brpc_socket_unittest.cpp
+++ b/test/brpc_socket_unittest.cpp
@@ -335,10 +335,17 @@ TEST_F(SocketTest, single_threaded_connect_and_write) {
EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
};
+ int listening_fd = -1;
butil::EndPoint point(butil::IP_ANY, 7878);
- int listening_fd = tcp_listen(point);
- ASSERT_TRUE(listening_fd > 0);
- butil::make_non_blocking(listening_fd);
+ for (int i = 0; i < 100; ++i) {
+ point.port += i;
+ listening_fd = tcp_listen(point);
+ if (listening_fd >= 0) {
+ break;
+ }
+ }
+ ASSERT_GT(listening_fd, 0) << berror();
+ ASSERT_EQ(0, butil::make_non_blocking(listening_fd));
ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false));
@@ -1130,6 +1137,7 @@ TEST_F(SocketTest, keepalive) {
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
CheckNoKeepalive(ptr->fd());
+ ASSERT_EQ(0, ptr->SetFailed());
}
int keepalive_idle = 1;
@@ -1148,6 +1156,7 @@ TEST_F(SocketTest, keepalive) {
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
default_keepalive_interval, default_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive idle.
@@ -1157,8 +1166,7 @@ TEST_F(SocketTest, keepalive) {
brpc::SocketOptions options;
options.fd = sockfd;
options.keepalive_options =
std::make_shared<brpc::SocketKeepaliveOptions>();
- options.keepalive_options->keepalive_idle_s
- = keepalive_idle;
+ options.keepalive_options->keepalive_idle_s = keepalive_idle;
brpc::SocketId id;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr ptr;
@@ -1166,6 +1174,7 @@ TEST_F(SocketTest, keepalive) {
CheckKeepalive(ptr->fd(), true, keepalive_idle,
default_keepalive_interval,
default_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive interval.
@@ -1175,14 +1184,14 @@ TEST_F(SocketTest, keepalive) {
brpc::SocketOptions options;
options.fd = sockfd;
options.keepalive_options =
std::make_shared<brpc::SocketKeepaliveOptions>();
- options.keepalive_options->keepalive_interval_s
- = keepalive_interval;
+ options.keepalive_options->keepalive_interval_s = keepalive_interval;
brpc::SocketId id;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
keepalive_interval, default_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive count.
@@ -1199,6 +1208,7 @@ TEST_F(SocketTest, keepalive) {
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
default_keepalive_interval, keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive idle, interval, count.
@@ -1217,10 +1227,25 @@ TEST_F(SocketTest, keepalive) {
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
CheckKeepalive(ptr->fd(), true, keepalive_idle,
keepalive_interval, keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
}
TEST_F(SocketTest, keepalive_input_message) {
+ brpc::Acceptor* messenger = new brpc::Acceptor;
+ int listening_fd = -1;
+ butil::EndPoint point(butil::IP_ANY, 7878);
+ for (int i = 0; i < 100; ++i) {
+ point.port += i;
+ listening_fd = tcp_listen(point);
+ if (listening_fd >= 0) {
+ break;
+ }
+ }
+ ASSERT_GT(listening_fd, 0) << berror();
+ ASSERT_EQ(0, butil::make_non_blocking(listening_fd));
+ ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false));
+
int default_keepalive = 0;
int default_keepalive_idle = 0;
int default_keepalive_interval = 0;
@@ -1234,76 +1259,81 @@ TEST_F(SocketTest, keepalive_input_message) {
// Disable keepalive.
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckNoKeepalive(ptr->fd());
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive.
brpc::FLAGS_socket_keepalive = true;
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
default_keepalive_interval, default_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive idle.
brpc::FLAGS_socket_keepalive_idle_s = 10;
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
default_keepalive_interval, default_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive idle, interval.
brpc::FLAGS_socket_keepalive_interval_s = 10;
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
brpc::FLAGS_socket_keepalive_interval_s,
default_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Enable keepalive and set keepalive idle, interval, count.
brpc::FLAGS_socket_keepalive_count = 10;
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
brpc::FLAGS_socket_keepalive_interval_s,
brpc::FLAGS_socket_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
// Options of keepalive set by user have priority over Gflags.
@@ -1311,56 +1341,58 @@ TEST_F(SocketTest, keepalive_input_message) {
int keepalive_interval = 2;
int keepalive_count = 2;
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
options.keepalive_options =
std::make_shared<brpc::SocketKeepaliveOptions>();
options.keepalive_options->keepalive_idle_s = keepalive_idle;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, keepalive_idle,
brpc::FLAGS_socket_keepalive_interval_s,
brpc::FLAGS_socket_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
options.keepalive_options =
std::make_shared<brpc::SocketKeepaliveOptions>();
options.keepalive_options->keepalive_interval_s = keepalive_interval;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
keepalive_interval, brpc::FLAGS_socket_keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
options.keepalive_options =
std::make_shared<brpc::SocketKeepaliveOptions>();
options.keepalive_options->keepalive_count = keepalive_count;
brpc::SocketId id = brpc::INVALID_SOCKET_ID;
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
brpc::FLAGS_socket_keepalive_interval_s,
keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
{
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- ASSERT_GT(sockfd, 0);
brpc::SocketOptions options;
- options.fd = sockfd;
+ options.remote_side = point;
+ options.connect_on_create = true;
options.keepalive_options =
std::make_shared<brpc::SocketKeepaliveOptions>();
options.keepalive_options->keepalive_idle_s = keepalive_idle;
options.keepalive_options->keepalive_interval_s = keepalive_interval;
@@ -1369,9 +1401,16 @@ TEST_F(SocketTest, keepalive_input_message) {
ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options,
&id));
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+ ASSERT_GT(ptr->fd(), 0);
CheckKeepalive(ptr->fd(), true, keepalive_idle,
keepalive_interval, keepalive_count);
+ ASSERT_EQ(0, ptr->SetFailed());
}
+
+ messenger->StopAccept(0);
+ ASSERT_EQ(-1, messenger->listened_fd());
+ ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD));
+ ASSERT_EQ(EBADF, errno);
}
#if defined(OS_LINUX)
diff --git a/test/brpc_streaming_rpc_unittest.cpp
b/test/brpc_streaming_rpc_unittest.cpp
index b0dd4a39..056ea9a9 100644
--- a/test/brpc_streaming_rpc_unittest.cpp
+++ b/test/brpc_streaming_rpc_unittest.cpp
@@ -240,7 +240,6 @@ TEST_F(StreamingRpcTest, block) {
out.append(&dummy, sizeof(dummy));
ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
hc.block = false;
- ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL));
// wait flushing all the pending messages
while (handler._expected_next_value != N) {
usleep(100);
@@ -249,6 +248,7 @@ TEST_F(StreamingRpcTest, block) {
hc.block = true;
// async wait
for (int i = N; i < N + N; ++i) {
+ ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL));
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
@@ -432,18 +432,12 @@ TEST_F(StreamingRpcTest, idle_timeout) {
class PingPongHandler : public brpc::StreamInputHandler {
public:
- explicit PingPongHandler()
- : _expected_next_value(0)
- , _failed(false)
- , _stopped(false)
- , _idle_times(0)
- {
- }
int on_received_messages(brpc::StreamId id,
butil::IOBuf *const messages[],
size_t size) override {
if (size != 1) {
- _failed = true;
+ LOG(INFO) << "size=" << size;
+ _error = true;
return 0;
}
for (size_t i = 0; i < size; ++i) {
@@ -451,7 +445,7 @@ public:
int network = 0;
messages[i]->cutn(&network, sizeof(int));
if ((int)ntohl(network) != _expected_next_value) {
- _failed = true;
+ _error = true;
}
int send_back = ntohl(network) + 1;
_expected_next_value = send_back + 1;
@@ -481,14 +475,16 @@ public:
_failed = true;
}
+ bool error() const { return _error; }
bool failed() const { return _failed; }
bool stopped() const { return _stopped; }
int idle_times() const { return _idle_times; }
private:
- int _expected_next_value;
- bool _failed;
- bool _stopped;
- int _idle_times;
+ int _expected_next_value{0};
+ bool _error{false};
+ bool _failed{false};
+ bool _stopped{false};
+ int _idle_times{0};
};
TEST_F(StreamingRpcTest, ping_pong) {
@@ -524,8 +520,8 @@ TEST_F(StreamingRpcTest, ping_pong) {
while (!resh.stopped() || !reqh.stopped()) {
usleep(100);
}
- ASSERT_FALSE(resh.failed());
- ASSERT_FALSE(reqh.failed());
+ ASSERT_FALSE(resh.error());
+ ASSERT_FALSE(reqh.error());
ASSERT_EQ(0, resh.idle_times());
ASSERT_EQ(0, reqh.idle_times());
}
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index 91b89c21..fac6a4f2 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -616,7 +616,7 @@ void TestConnectInterruptImpl(bool timed) {
int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
- timespec abstime = butil::milliseconds_from_now(connect_ms + 100);
+ timespec abstime = butil::milliseconds_from_now(connect_ms * 10);
rc = bthread_timed_connect(
sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, &abstime);
diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp
index 1fbefb27..eaa16fa5 100644
--- a/test/bthread_unittest.cpp
+++ b/test/bthread_unittest.cpp
@@ -600,8 +600,9 @@ TEST_F(BthreadTest, test_span) {
test_son_parent_span, &multi_p2));
ASSERT_EQ(0, bthread_join(multi_th1, NULL));
ASSERT_EQ(0, bthread_join(multi_th2, NULL));
- ASSERT_EQ(multi_p1, targets[0]);
- ASSERT_EQ(multi_p2, targets[1]);
+ ASSERT_NE(multi_p1, multi_p2);
+ ASSERT_NE(std::find(targets, targets + 4, multi_p1), targets + 4);
+ ASSERT_NE(std::find(targets, targets + 4, multi_p2), targets + 4);
}
void* dummy_thread(void*) {
@@ -628,48 +629,74 @@ TEST_F(BthreadTest, yield_single_thread) {
}
#ifdef BRPC_BTHREAD_TRACER
-TEST_F(BthreadTest, trace) {
- start = false;
- stop = false;
- bthread_t th;
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1));
- while (!start) {
- usleep(10 * 1000);
- }
- bthread::FLAGS_enable_fast_unwind = false;
- std::string st = bthread::stack_trace(th);
- LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st;
- ASSERT_NE(std::string::npos, st.find("spin_and_log"));
-
- bthread::FLAGS_enable_fast_unwind = true;
- st = bthread::stack_trace(th);
- LOG(INFO) << "spin_and_log stack trace:\n" << st;
- ASSERT_NE(std::string::npos, st.find("spin_and_log"));
- stop = true;
- ASSERT_EQ(0, bthread_join(th, NULL));
+void spin_and_log_trace() {
+ bool ok = false;
+ for (int i = 0; i < 10; ++i) {
+ start = false;
+ stop = false;
+ bthread_t th;
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1));
+ while (!start) {
+ usleep(10 * 1000);
+ }
+ bthread::FLAGS_enable_fast_unwind = false;
+ std::string st1 = bthread::stack_trace(th);
+ LOG(INFO) << "spin_and_log stack trace:\n" << st1;
+
+ bthread::FLAGS_enable_fast_unwind = true;
+ std::string st2 = bthread::stack_trace(th);
+ LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st2;
+ stop = true;
+ ASSERT_EQ(0, bthread_join(th, NULL));
+
+ std::string st3 = bthread::stack_trace(th);
+ LOG(INFO) << "ended bthread stack trace:\n" << st3;
+ ASSERT_NE(std::string::npos, st3.find("not exist now"));
- start = false;
- stop = false;
- ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, (void*)1));
- while (!start) {
- usleep(10 * 1000);
+ ok = st1.find("spin_and_log") != std::string::npos &&
+ st2.find("spin_and_log") != std::string::npos;
+ if (ok) {
+ break;
+ }
}
- bthread::FLAGS_enable_fast_unwind = false;
- st = bthread::stack_trace(th);
- LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st;
- ASSERT_NE(std::string::npos, st.find("repeated_sleep"));
-
- bthread::FLAGS_enable_fast_unwind = true;
- st = bthread::stack_trace(th);
- LOG(INFO) << "repeated_sleep stack trace:\n" << st;
- ASSERT_NE(std::string::npos, st.find("repeated_sleep"));
- stop = true;
- ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_TRUE(ok);
+}
+
+void repeated_sleep_trace() {
+ bool ok = false;
+ for (int i = 0; i < 10; ++i) {
+ start = false;
+ stop = false;
+ bthread_t th;
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep,
(void*)1));
+ while (!start) {
+ usleep(10 * 1000);
+ }
+ bthread::FLAGS_enable_fast_unwind = false;
+ std::string st1 = bthread::stack_trace(th);
+ LOG(INFO) << "repeated_sleep stack trace:\n" << st1;
- st = bthread::stack_trace(th);
- LOG(INFO) << "ended bthread stack trace:\n" << st;
- ASSERT_NE(std::string::npos, st.find("not exist now"));
+ bthread::FLAGS_enable_fast_unwind = true;
+ std::string st2 = bthread::stack_trace(th);
+ LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st2;
+ stop = true;
+ ASSERT_EQ(0, bthread_join(th, NULL));
+
+ std::string st3 = bthread::stack_trace(th);
+ LOG(INFO) << "ended bthread stack trace:\n" << st3;
+ ASSERT_NE(std::string::npos, st3.find("not exist now"));
+ ok = st1.find("repeated_sleep") != std::string::npos &&
+ st2.find("repeated_sleep") != std::string::npos;
+ if (ok) {
+ break;
+ }
+ }
+ ASSERT_TRUE(ok);
+}
+TEST_F(BthreadTest, trace) {
+ spin_and_log_trace();
+ repeated_sleep_trace();
}
#endif // BRPC_BTHREAD_TRACER
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index 6ea45c04..cf471315 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -499,7 +499,7 @@ TEST(EndPointTest, tcp_connect) {
ASSERT_LE(0, sockfd) << "errno=" << errno;
}
{
- butil::fd_guard sockfd(butil::tcp_connect(ep1, NULL, 1));
+ butil::fd_guard sockfd(butil::tcp_connect(ep2, NULL, 1));
ASSERT_EQ(-1, sockfd) << "errno=" << errno;
ASSERT_EQ(ETIMEDOUT, errno);
}
@@ -553,7 +553,7 @@ void TestConnectInterruptImpl(bool timed) {
int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
- timespec abstime = butil::milliseconds_from_now(connect_ms * 2);
+ timespec abstime = butil::milliseconds_from_now(connect_ms * 10);
rc = butil::pthread_timed_connect(
sockfd, (struct sockaddr*) &serv_addr,
serv_addr_size, &abstime);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]