This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c02274ce [rpc] increase listened socket backlog up to 512
6c02274ce is described below

commit 6c02274ce275615fbbc83703b6f695a0a53c87f1
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Thu Dec 14 10:36:31 2023 -0800

    [rpc] increase listened socket backlog up to 512
    
    This patch updates the default value for --rpc_acceptor_listen_backlog,
    setting it up to 512 by default.  This is to help busy Kudu servers
    accommodating of larger bursts of incoming requests for new RPC
    connections.  In addition, a warning message is output into the log
    if the setting for the flag is effectively capped by the system-level
    limit.
    
    I manually verified that the warning is issued as expected on Linux and
    macOS, correspondingly:
    
      --rpc_acceptor_listen_backlog setting 512 is capped at 128 by 
/proc/sys/net/core/somaxconn
    
      --rpc_acceptor_listen_backlog setting 512 is capped at 256 by 
kern.ipc.somaxconn
    
    Change-Id: Ib6f5791acad6ea0787e23d4c71ab2a7ac4c8c1f2
    Reviewed-on: http://gerrit.cloudera.org:8080/20797
    Tested-by: Kudu Jenkins
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
---
 src/kudu/rpc/acceptor_pool.cc  | 38 +++++++++++++++-----
 src/kudu/rpc/acceptor_pool.h   | 10 +++++-
 src/kudu/rpc/messenger.cc      |  9 +++--
 src/kudu/rpc/messenger.h       | 11 ++++++
 src/kudu/server/server_base.cc | 80 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index ec7c86bf7..c20807a93 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -42,6 +42,8 @@
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
+using std::string;
+using strings::Substitute;
 
 METRIC_DEFINE_counter(server, rpc_connections_accepted,
                       "RPC Connections Accepted",
@@ -66,27 +68,45 @@ METRIC_DEFINE_histogram(server, acceptor_dispatch_times,
                         kudu::MetricLevel::kInfo,
                         1000000, 2);
 
-DEFINE_int32(rpc_acceptor_listen_backlog, 128,
+DEFINE_int32(rpc_acceptor_listen_backlog,
+             kudu::rpc::AcceptorPool::kDefaultListenBacklog,
              "Socket backlog parameter used when listening for RPC 
connections. "
              "This defines the maximum length to which the queue of pending "
-             "TCP connections inbound to the RPC server may grow. If a 
connection "
-             "request arrives when the queue is full, the client may receive "
-             "an error. Higher values may help the server ride over bursts of "
-             "new inbound connection requests.");
+             "TCP connections inbound to the RPC server may grow. The value "
+             "might be silently capped by the system-level limit on the 
listened "
+             "socket's backlog. The value of -1 has the semantics of the "
+             "longest possible queue with the length up to the system-level "
+             "limit. If a connection request arrives when the queue is full, "
+             "the client may receive an error. Higher values may help "
+             "the server ride over bursts of new inbound connection 
requests.");
 TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
 
-using std::string;
-using strings::Substitute;
+namespace {
+bool ValidateListenBacklog(const char* flagname, int value) {
+  if (value >= -1) {
+    return true;
+  }
+  LOG(ERROR) << Substitute(
+      "$0: invalid setting for $1; regular setting must be at least 0, and -1 "
+      "is a special value with the semantics of maximum possible setting "
+      "capped at the system-wide limit", value, flagname);
+  return false;
+}
+} // anonymous namespace
+DEFINE_validator(rpc_acceptor_listen_backlog, &ValidateListenBacklog);
+
 
 namespace kudu {
 namespace rpc {
 
 AcceptorPool::AcceptorPool(Messenger* messenger,
                            Socket* socket,
-                           const Sockaddr& bind_address)
+                           const Sockaddr& bind_address,
+                           int listen_backlog)
     : messenger_(messenger),
       socket_(socket->Release()),
       bind_address_(bind_address),
+      listen_backlog_(listen_backlog),
       closing_(false) {
   const auto& metric_entity = messenger->metric_entity();
   auto& connections_accepted = bind_address.is_ip()
@@ -101,7 +121,7 @@ AcceptorPool::~AcceptorPool() {
 }
 
 Status AcceptorPool::Start(int num_threads) {
-  RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
+  RETURN_NOT_OK(socket_.Listen(listen_backlog_));
 
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<Thread> new_thread;
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 14b2740e1..50e15e201 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -41,10 +41,17 @@ class Messenger;
 // shut down, if Shutdown() is called, or if the pool object is destructed.
 class AcceptorPool {
  public:
+  // Default size of the pending connections queue for a socket listened by
+  // AcceptorPool::Start().
+  static constexpr int kDefaultListenBacklog = 512;
+
   // Create a new acceptor pool.  Calls socket::Release to take ownership of 
the
   // socket.
   // 'socket' must be already bound, but should not yet be listening.
-  AcceptorPool(Messenger* messenger, Socket* socket, const Sockaddr& 
bind_address);
+  AcceptorPool(Messenger* messenger,
+               Socket* socket,
+               const Sockaddr& bind_address,
+               int listen_backlog = kDefaultListenBacklog);
   ~AcceptorPool();
 
   // Start listening and accepting connections.
@@ -69,6 +76,7 @@ class AcceptorPool {
   Messenger* messenger_;
   Socket socket_;
   const Sockaddr bind_address_;
+  const int listen_backlog_;
   std::vector<scoped_refptr<Thread>> threads_;
 
   std::atomic<bool> closing_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 923d85f7f..6820490b3 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -69,6 +69,7 @@ const int64_t MessengerBuilder::kRpcNegotiationTimeoutMs = 
3000;
 MessengerBuilder::MessengerBuilder(string name)
     : name_(std::move(name)),
       connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
+      acceptor_listen_backlog_(AcceptorPool::kDefaultListenBacklog),
       num_reactors_(4),
       min_negotiation_threads_(0),
       max_negotiation_threads_(4),
@@ -220,9 +221,10 @@ Status Messenger::AddAcceptorPool(const Sockaddr& 
accept_addr,
     RETURN_NOT_OK(sock.SetReusePort(true));
   }
   RETURN_NOT_OK(sock.Bind(accept_addr));
-  Sockaddr remote;
-  RETURN_NOT_OK(sock.GetSocketAddress(&remote));
-  auto acceptor_pool(std::make_shared<AcceptorPool>(this, &sock, remote));
+  Sockaddr addr;
+  RETURN_NOT_OK(sock.GetSocketAddress(&addr));
+  auto acceptor_pool(std::make_shared<AcceptorPool>(
+      this, &sock, addr, acceptor_listen_backlog_));
 
   std::lock_guard<percpu_rwlock> guard(lock_);
   acceptor_pools_.push_back(acceptor_pool);
@@ -330,6 +332,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
       sasl_proto_name_(bld.sasl_proto_name_),
       keytab_file_(bld.keytab_file_),
       reuseport_(bld.reuseport_),
+      acceptor_listen_backlog_(bld.acceptor_listen_backlog_),
       retain_self_(this) {
   for (int i = 0; i < bld.num_reactors_; i++) {
     reactors_.push_back(new Reactor(retain_self_, i, bld));
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index d8e37e99c..753681d46 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -97,6 +97,11 @@ class MessengerBuilder {
     return *this;
   }
 
+  MessengerBuilder& set_acceptor_listen_backlog(int max_queue_len) {
+    acceptor_listen_backlog_ = max_queue_len;
+    return *this;
+  }
+
   // Set the number of reactor threads that will be used for sending and
   // receiving.
   MessengerBuilder& set_num_reactors(int num_reactors) {
@@ -270,6 +275,7 @@ class MessengerBuilder {
  private:
   const std::string name_;
   MonoDelta connection_keepalive_time_;
+  int acceptor_listen_backlog_;
   int num_reactors_;
   int min_negotiation_threads_;
   int max_negotiation_threads_;
@@ -562,6 +568,11 @@ class Messenger {
   // Whether to set SO_REUSEPORT on the listening sockets.
   const bool reuseport_;
 
+  // Acceptor's listened socket backlog: the capacity of the queue to
+  // accommodate incoming (but not accepted yet) connection requests to the
+  // messenger's listening sockets.
+  const int acceptor_listen_backlog_;
+
   // The ownership of the Messenger object is somewhat subtle. The pointer 
graph
   // looks like this:
   //
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 84d6a7b24..8b5f554b0 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -17,9 +17,15 @@
 
 #include "kudu/server/server_base.h"
 
+#if defined(__APPLE__)
+#include <sys/sysctl.h>
+#endif
+
 #include <algorithm>
+#include <cerrno> // IWYU pragma: keep
 #include <cstdint>
 #include <functional>
+#include <limits>
 #include <mutex>
 #include <optional>
 #include <set>
@@ -72,6 +78,7 @@
 #include "kudu/util/cloud/instance_detector.h"
 #include "kudu/util/cloud/instance_metadata.h"
 #include "kudu/util/env.h"
+#include "kudu/util/errno.h"  // IWYU pragma: keep
 #include "kudu/util/faststring.h"
 #include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
@@ -303,6 +310,7 @@ DECLARE_uint32(dns_resolver_cache_capacity_mb);
 DECLARE_uint32(dns_resolver_cache_ttl_sec);
 DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
 DECLARE_int32(fs_wal_dir_available_space_cache_seconds);
+DECLARE_int32(rpc_acceptor_listen_backlog);
 DECLARE_int64(fs_wal_dir_reserved_bytes);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_string(log_filename);
@@ -624,6 +632,68 @@ int64_t GetFileCacheCapacity(Env* env) {
   return FLAGS_server_max_open_files;
 }
 
+#if defined(__linux__)
+// See https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
+// and https://man7.org/linux/man-pages/man2/listen.2.html for details.
+constexpr const char* const kListenBacklogMax = "/proc/sys/net/core/somaxconn";
+#endif
+
+#if defined(__APPLE__)
+// See https://man.freebsd.org/cgi/man.cgi?query=listen for details.
+// NOTE: it might be not exactly relevant to Darwin/macOS, but so far nothing
+//       indicates it's not applicable at least for Darwin 20.6.0/macOS 11.7.
+constexpr const char* const kListenBacklogMax = "kern.ipc.somaxconn";
+#endif
+
+int32_t GetEffectiveListenSocketBacklog(Env* env, int backlog) {
+#if defined(__APPLE__)
+  uint32_t buf_val;
+  size_t len = sizeof(buf_val);
+  if (sysctlbyname(kListenBacklogMax, &buf_val, &len, nullptr, 0) == -1) {
+    int err = errno;
+    LOG(WARNING) << Substitute(
+        "could not retrieve $0 to get listened socket queue size limit: $1",
+        kListenBacklogMax, ErrnoToString(err));
+    return backlog;
+  }
+  DCHECK_EQ(sizeof(buf_val), len);
+#elif defined(__linux__)
+  faststring buf;
+  if (auto s = ReadFileToString(env, kListenBacklogMax, &buf); !s.ok()) {
+    LOG(WARNING) << Substitute(
+        "could not read $0 to get listened socket queue size limit: $1",
+        kListenBacklogMax, s.ToString());
+    return backlog;
+  }
+  uint32_t buf_val;
+  if (!safe_strtou32(buf.ToString(), &buf_val)) {
+    LOG(WARNING) << Substitute(
+        "could not parse contents of $0 ('$1') to get listened socket queue 
size limit",
+        kListenBacklogMax, buf.ToString());
+    return backlog;
+  }
+#else
+  return backlog;
+#endif
+
+  // 'rpc_acceptor_listen_backlog == -1' means the highest possible value
+  // for the backlog size.
+  uint32_t effective_backlog =
+      (backlog >= 0) ? backlog : std::numeric_limits<uint32_t>::max();
+
+  // The system-wide limit caps the 'backlog' parameter of the listen()
+  // system call.
+  effective_backlog = std::min<uint32_t>(effective_backlog, buf_val);
+
+  // A bit of paranoia w.r.t. the system-level limit:
+  //   * for a debug build, use DCHECK to make sure the value is sane
+  //   * for a release build, cap it to INT32_MAX
+  DCHECK_LE(effective_backlog, std::numeric_limits<int32_t>::max());
+  effective_backlog = std::min<uint32_t>(effective_backlog,
+                                         std::numeric_limits<int32_t>::max());
+  return effective_backlog;
+}
+
 } // anonymous namespace
 
 ServerBase::ServerBase(string name, const ServerBaseOptions& options,
@@ -812,6 +882,15 @@ Status ServerBase::Init() {
   vector<string> rpc_tls_excluded_protocols = strings::Split(
       FLAGS_rpc_tls_excluded_protocols, ",", strings::SkipEmpty());
 
+  const auto listen_backlog = FLAGS_rpc_acceptor_listen_backlog;
+  const auto effective_listen_backlog = GetEffectiveListenSocketBacklog(
+      Env::Default(), listen_backlog);
+  if (effective_listen_backlog != listen_backlog) {
+    LOG(WARNING) << Substitute(
+        "--rpc_acceptor_listen_backlog setting $0 is capped at $1 by $2",
+        listen_backlog, effective_listen_backlog, kListenBacklogMax);
+  }
+
   // Create the Messenger.
   rpc::MessengerBuilder builder(name_);
   builder.set_num_reactors(FLAGS_num_reactor_threads)
@@ -831,6 +910,7 @@ Status ServerBase::Init() {
          .set_epki_private_password_key_cmd(FLAGS_rpc_private_key_password_cmd)
          .set_keytab_file(FLAGS_keytab_file)
          .set_hostname(hostname)
+         .set_acceptor_listen_backlog(listen_backlog)
          .enable_inbound_tls();
 
   auto username = kudu::security::GetLoggedInUsernameFromKeytab();

Reply via email to