This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 6c02274ce [rpc] increase listened socket backlog up to 512 6c02274ce is described below commit 6c02274ce275615fbbc83703b6f695a0a53c87f1 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Thu Dec 14 10:36:31 2023 -0800 [rpc] increase listened socket backlog up to 512 This patch updates the default value for --rpc_acceptor_listen_backlog, setting it up to 512 by default. This is to help busy Kudu servers accommodating of larger bursts of incoming requests for new RPC connections. In addition, a warning message is output into the log if the setting for the flag is effectively capped by the system-level limit. I manually verified that the warning is issued as expected on Linux and macOS, correspondingly: --rpc_acceptor_listen_backlog setting 512 is capped at 128 by /proc/sys/net/core/somaxconn --rpc_acceptor_listen_backlog setting 512 is capped at 256 by kern.ipc.somaxconn Change-Id: Ib6f5791acad6ea0787e23d4c71ab2a7ac4c8c1f2 Reviewed-on: http://gerrit.cloudera.org:8080/20797 Tested-by: Kudu Jenkins Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> --- src/kudu/rpc/acceptor_pool.cc | 38 +++++++++++++++----- src/kudu/rpc/acceptor_pool.h | 10 +++++- src/kudu/rpc/messenger.cc | 9 +++-- src/kudu/rpc/messenger.h | 11 ++++++ src/kudu/server/server_base.cc | 80 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 13 deletions(-) diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc index ec7c86bf7..c20807a93 100644 --- a/src/kudu/rpc/acceptor_pool.cc +++ b/src/kudu/rpc/acceptor_pool.cc @@ -42,6 +42,8 @@ #include "kudu/util/status.h" #include "kudu/util/thread.h" +using std::string; +using strings::Substitute; METRIC_DEFINE_counter(server, rpc_connections_accepted, "RPC Connections Accepted", @@ -66,27 +68,45 @@ METRIC_DEFINE_histogram(server, acceptor_dispatch_times, kudu::MetricLevel::kInfo, 1000000, 2); -DEFINE_int32(rpc_acceptor_listen_backlog, 128, +DEFINE_int32(rpc_acceptor_listen_backlog, + kudu::rpc::AcceptorPool::kDefaultListenBacklog, "Socket backlog parameter used when listening for RPC connections. " "This defines the maximum length to which the queue of pending " - "TCP connections inbound to the RPC server may grow. If a connection " - "request arrives when the queue is full, the client may receive " - "an error. Higher values may help the server ride over bursts of " - "new inbound connection requests."); + "TCP connections inbound to the RPC server may grow. The value " + "might be silently capped by the system-level limit on the listened " + "socket's backlog. The value of -1 has the semantics of the " + "longest possible queue with the length up to the system-level " + "limit. If a connection request arrives when the queue is full, " + "the client may receive an error. Higher values may help " + "the server ride over bursts of new inbound connection requests."); TAG_FLAG(rpc_acceptor_listen_backlog, advanced); -using std::string; -using strings::Substitute; +namespace { +bool ValidateListenBacklog(const char* flagname, int value) { + if (value >= -1) { + return true; + } + LOG(ERROR) << Substitute( + "$0: invalid setting for $1; regular setting must be at least 0, and -1 " + "is a special value with the semantics of maximum possible setting " + "capped at the system-wide limit", value, flagname); + return false; +} +} // anonymous namespace +DEFINE_validator(rpc_acceptor_listen_backlog, &ValidateListenBacklog); + namespace kudu { namespace rpc { AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket, - const Sockaddr& bind_address) + const Sockaddr& bind_address, + int listen_backlog) : messenger_(messenger), socket_(socket->Release()), bind_address_(bind_address), + listen_backlog_(listen_backlog), closing_(false) { const auto& metric_entity = messenger->metric_entity(); auto& connections_accepted = bind_address.is_ip() @@ -101,7 +121,7 @@ AcceptorPool::~AcceptorPool() { } Status AcceptorPool::Start(int num_threads) { - RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog)); + RETURN_NOT_OK(socket_.Listen(listen_backlog_)); for (int i = 0; i < num_threads; i++) { scoped_refptr<Thread> new_thread; diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h index 14b2740e1..50e15e201 100644 --- a/src/kudu/rpc/acceptor_pool.h +++ b/src/kudu/rpc/acceptor_pool.h @@ -41,10 +41,17 @@ class Messenger; // shut down, if Shutdown() is called, or if the pool object is destructed. class AcceptorPool { public: + // Default size of the pending connections queue for a socket listened by + // AcceptorPool::Start(). + static constexpr int kDefaultListenBacklog = 512; + // Create a new acceptor pool. Calls socket::Release to take ownership of the // socket. // 'socket' must be already bound, but should not yet be listening. - AcceptorPool(Messenger* messenger, Socket* socket, const Sockaddr& bind_address); + AcceptorPool(Messenger* messenger, + Socket* socket, + const Sockaddr& bind_address, + int listen_backlog = kDefaultListenBacklog); ~AcceptorPool(); // Start listening and accepting connections. @@ -69,6 +76,7 @@ class AcceptorPool { Messenger* messenger_; Socket socket_; const Sockaddr bind_address_; + const int listen_backlog_; std::vector<scoped_refptr<Thread>> threads_; std::atomic<bool> closing_; diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc index 923d85f7f..6820490b3 100644 --- a/src/kudu/rpc/messenger.cc +++ b/src/kudu/rpc/messenger.cc @@ -69,6 +69,7 @@ const int64_t MessengerBuilder::kRpcNegotiationTimeoutMs = 3000; MessengerBuilder::MessengerBuilder(string name) : name_(std::move(name)), connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)), + acceptor_listen_backlog_(AcceptorPool::kDefaultListenBacklog), num_reactors_(4), min_negotiation_threads_(0), max_negotiation_threads_(4), @@ -220,9 +221,10 @@ Status Messenger::AddAcceptorPool(const Sockaddr& accept_addr, RETURN_NOT_OK(sock.SetReusePort(true)); } RETURN_NOT_OK(sock.Bind(accept_addr)); - Sockaddr remote; - RETURN_NOT_OK(sock.GetSocketAddress(&remote)); - auto acceptor_pool(std::make_shared<AcceptorPool>(this, &sock, remote)); + Sockaddr addr; + RETURN_NOT_OK(sock.GetSocketAddress(&addr)); + auto acceptor_pool(std::make_shared<AcceptorPool>( + this, &sock, addr, acceptor_listen_backlog_)); std::lock_guard<percpu_rwlock> guard(lock_); acceptor_pools_.push_back(acceptor_pool); @@ -330,6 +332,7 @@ Messenger::Messenger(const MessengerBuilder &bld) sasl_proto_name_(bld.sasl_proto_name_), keytab_file_(bld.keytab_file_), reuseport_(bld.reuseport_), + acceptor_listen_backlog_(bld.acceptor_listen_backlog_), retain_self_(this) { for (int i = 0; i < bld.num_reactors_; i++) { reactors_.push_back(new Reactor(retain_self_, i, bld)); diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h index d8e37e99c..753681d46 100644 --- a/src/kudu/rpc/messenger.h +++ b/src/kudu/rpc/messenger.h @@ -97,6 +97,11 @@ class MessengerBuilder { return *this; } + MessengerBuilder& set_acceptor_listen_backlog(int max_queue_len) { + acceptor_listen_backlog_ = max_queue_len; + return *this; + } + // Set the number of reactor threads that will be used for sending and // receiving. MessengerBuilder& set_num_reactors(int num_reactors) { @@ -270,6 +275,7 @@ class MessengerBuilder { private: const std::string name_; MonoDelta connection_keepalive_time_; + int acceptor_listen_backlog_; int num_reactors_; int min_negotiation_threads_; int max_negotiation_threads_; @@ -562,6 +568,11 @@ class Messenger { // Whether to set SO_REUSEPORT on the listening sockets. const bool reuseport_; + // Acceptor's listened socket backlog: the capacity of the queue to + // accommodate incoming (but not accepted yet) connection requests to the + // messenger's listening sockets. + const int acceptor_listen_backlog_; + // The ownership of the Messenger object is somewhat subtle. The pointer graph // looks like this: // diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc index 84d6a7b24..8b5f554b0 100644 --- a/src/kudu/server/server_base.cc +++ b/src/kudu/server/server_base.cc @@ -17,9 +17,15 @@ #include "kudu/server/server_base.h" +#if defined(__APPLE__) +#include <sys/sysctl.h> +#endif + #include <algorithm> +#include <cerrno> // IWYU pragma: keep #include <cstdint> #include <functional> +#include <limits> #include <mutex> #include <optional> #include <set> @@ -72,6 +78,7 @@ #include "kudu/util/cloud/instance_detector.h" #include "kudu/util/cloud/instance_metadata.h" #include "kudu/util/env.h" +#include "kudu/util/errno.h" // IWYU pragma: keep #include "kudu/util/faststring.h" #include "kudu/util/file_cache.h" #include "kudu/util/flag_tags.h" @@ -303,6 +310,7 @@ DECLARE_uint32(dns_resolver_cache_capacity_mb); DECLARE_uint32(dns_resolver_cache_ttl_sec); DECLARE_int32(fs_data_dirs_available_space_cache_seconds); DECLARE_int32(fs_wal_dir_available_space_cache_seconds); +DECLARE_int32(rpc_acceptor_listen_backlog); DECLARE_int64(fs_wal_dir_reserved_bytes); DECLARE_int64(fs_data_dirs_reserved_bytes); DECLARE_string(log_filename); @@ -624,6 +632,68 @@ int64_t GetFileCacheCapacity(Env* env) { return FLAGS_server_max_open_files; } +#if defined(__linux__) +// See https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt +// and https://man7.org/linux/man-pages/man2/listen.2.html for details. +constexpr const char* const kListenBacklogMax = "/proc/sys/net/core/somaxconn"; +#endif + +#if defined(__APPLE__) +// See https://man.freebsd.org/cgi/man.cgi?query=listen for details. +// NOTE: it might be not exactly relevant to Darwin/macOS, but so far nothing +// indicates it's not applicable at least for Darwin 20.6.0/macOS 11.7. +constexpr const char* const kListenBacklogMax = "kern.ipc.somaxconn"; +#endif + +int32_t GetEffectiveListenSocketBacklog(Env* env, int backlog) { +#if defined(__APPLE__) + uint32_t buf_val; + size_t len = sizeof(buf_val); + if (sysctlbyname(kListenBacklogMax, &buf_val, &len, nullptr, 0) == -1) { + int err = errno; + LOG(WARNING) << Substitute( + "could not retrieve $0 to get listened socket queue size limit: $1", + kListenBacklogMax, ErrnoToString(err)); + return backlog; + } + DCHECK_EQ(sizeof(buf_val), len); +#elif defined(__linux__) + faststring buf; + if (auto s = ReadFileToString(env, kListenBacklogMax, &buf); !s.ok()) { + LOG(WARNING) << Substitute( + "could not read $0 to get listened socket queue size limit: $1", + kListenBacklogMax, s.ToString()); + return backlog; + } + uint32_t buf_val; + if (!safe_strtou32(buf.ToString(), &buf_val)) { + LOG(WARNING) << Substitute( + "could not parse contents of $0 ('$1') to get listened socket queue size limit", + kListenBacklogMax, buf.ToString()); + return backlog; + } +#else + return backlog; +#endif + + // 'rpc_acceptor_listen_backlog == -1' means the highest possible value + // for the backlog size. + uint32_t effective_backlog = + (backlog >= 0) ? backlog : std::numeric_limits<uint32_t>::max(); + + // The system-wide limit caps the 'backlog' parameter of the listen() + // system call. + effective_backlog = std::min<uint32_t>(effective_backlog, buf_val); + + // A bit of paranoia w.r.t. the system-level limit: + // * for a debug build, use DCHECK to make sure the value is sane + // * for a release build, cap it to INT32_MAX + DCHECK_LE(effective_backlog, std::numeric_limits<int32_t>::max()); + effective_backlog = std::min<uint32_t>(effective_backlog, + std::numeric_limits<int32_t>::max()); + return effective_backlog; +} + } // anonymous namespace ServerBase::ServerBase(string name, const ServerBaseOptions& options, @@ -812,6 +882,15 @@ Status ServerBase::Init() { vector<string> rpc_tls_excluded_protocols = strings::Split( FLAGS_rpc_tls_excluded_protocols, ",", strings::SkipEmpty()); + const auto listen_backlog = FLAGS_rpc_acceptor_listen_backlog; + const auto effective_listen_backlog = GetEffectiveListenSocketBacklog( + Env::Default(), listen_backlog); + if (effective_listen_backlog != listen_backlog) { + LOG(WARNING) << Substitute( + "--rpc_acceptor_listen_backlog setting $0 is capped at $1 by $2", + listen_backlog, effective_listen_backlog, kListenBacklogMax); + } + // Create the Messenger. rpc::MessengerBuilder builder(name_); builder.set_num_reactors(FLAGS_num_reactor_threads) @@ -831,6 +910,7 @@ Status ServerBase::Init() { .set_epki_private_password_key_cmd(FLAGS_rpc_private_key_password_cmd) .set_keytab_file(FLAGS_keytab_file) .set_hostname(hostname) + .set_acceptor_listen_backlog(listen_backlog) .enable_inbound_tls(); auto username = kudu::security::GetLoggedInUsernameFromKeytab();