This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 1de4e933 Add the connection timeout when connecting master (#1172)
1de4e933 is described below
commit 1de4e933c7024f0df97e6485a4271015420aa2ef
Author: Myth <[email protected]>
AuthorDate: Mon Dec 12 22:30:12 2022 +0800
Add the connection timeout when connecting master (#1172)
---
src/cluster/replication.cc | 13 ++++++++++---
src/common/io_util.cc | 4 +---
src/common/scope_exit.h | 4 ++--
3 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index b757819f..bd24d997 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -42,6 +42,7 @@
#include "status.h"
#include "storage/batch_debugger.h"
#include "thread_util.h"
+#include "time_util.h"
Status FeedSlaveThread::Start() {
try {
@@ -236,18 +237,24 @@ void ReplicationThread::CallbacksStateMachine::Start() {
handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write",
authWriteCB);
}
+ uint64_t last_connect_timestamp = 0;
+ int connect_timeout_ms = 3100;
+
while (!repl_->stop_flag_ && bev == nullptr) {
- Status s = Util::SockConnect(repl_->host_, repl_->port_, &cfd);
+ if (Util::GetTimeStampMS() - last_connect_timestamp < 1000) {
+ // prevent frequent re-connect when the master is down with the
connection refused error
+ sleep(1);
+ }
+ last_connect_timestamp = Util::GetTimeStampMS();
+ Status s = Util::SockConnect(repl_->host_, repl_->port_, &cfd,
connect_timeout_ms);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to connect the master, err: " <<
s.Msg();
- sleep(1);
continue;
}
bev = bufferevent_socket_new(repl_->base_, cfd, BEV_OPT_CLOSE_ON_FREE);
if (bev == nullptr) {
close(cfd);
LOG(ERROR) << "[replication] Failed to create the event socket";
- sleep(1);
continue;
}
}
diff --git a/src/common/io_util.cc b/src/common/io_util.cc
index 4c3e40ba..8ae2882f 100644
--- a/src/common/io_util.cc
+++ b/src/common/io_util.cc
@@ -151,9 +151,7 @@ Status SockConnect(const std::string &host, uint32_t port,
int *fd, int conn_tim
sin.sin_port = htons(port);
fcntl(*fd, F_SETFL, O_NONBLOCK);
- if (connect(*fd, reinterpret_cast<sockaddr *>(&sin), sizeof(sin))) {
- return Status::FromErrno();
- }
+ connect(*fd, reinterpret_cast<sockaddr *>(&sin), sizeof(sin));
auto retmask = Util::aeWait(*fd, AE_WRITABLE, conn_timeout);
if ((retmask & AE_WRITABLE) == 0 || (retmask & AE_ERROR) != 0 || (retmask
& AE_HUP) != 0) {
diff --git a/src/common/scope_exit.h b/src/common/scope_exit.h
index 93e01259..2668091f 100644
--- a/src/common/scope_exit.h
+++ b/src/common/scope_exit.h
@@ -35,9 +35,9 @@ struct ScopeExit {
if (enabled_) f_();
}
- void Enable() { enabled_ = false; }
+ void Enable() { enabled_ = true; }
- void Disable() { enabled_ = true; }
+ void Disable() { enabled_ = false; }
bool enabled_;
F f_;