This is an automated email from the ASF dual-hosted git repository.

achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4648c9603 [rpc] introduce rpc_pending_connections metric
4648c9603 is described below

commit 4648c96037d90438b2ac955d3f2ddfe37e260263
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Tue Jan 23 22:47:40 2024 -0800

    [rpc] introduce rpc_pending_connections metric
    
    One of the prior patches introduced 'rpc_listen_socket_rx_queue_size'
    histogram-type metric.  That one provides information on the history of
    the RPC listening sockets' backlog of pending connections, but this new
    'rpc_pending_connections' metric reports on the current total number of
    pending RPC connections across all the RPC endpoints of a Kudu server.
    The newly introduced metric is useful for assessing the current status
    of the listening RPC sockets.  It's also useful for post-mortem analysis
    using the diagnostic logs that store snapshots of this new metric tied
    to the timestamps when they were captured.
    
    Change-Id: I0bfdaf9047f43495df85edba3200286f743330a8
    Reviewed-on: http://gerrit.cloudera.org:8080/20949
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
---
 src/kudu/rpc/acceptor_pool.cc | 11 +++++++++++
 src/kudu/rpc/acceptor_pool.h  |  5 +++++
 src/kudu/rpc/messenger.cc     | 45 ++++++++++++++++++++++++++++++++++++++-----
 src/kudu/rpc/messenger.h      |  8 ++++++++
 src/kudu/rpc/rpc-test.cc      | 30 +++++++++++++++++++++++++++++
 5 files changed, 94 insertions(+), 5 deletions(-)

diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index fca0f1e85..a5038526e 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -220,6 +220,17 @@ int64_t AcceptorPool::num_rpc_connections_accepted() const 
{
   return rpc_connections_accepted_->value();
 }
 
+Status AcceptorPool::GetPendingConnectionsNum(uint32_t* result) const {
+  DiagnosticSocket ds;
+  RETURN_NOT_OK(ds.Init());
+
+  DiagnosticSocket::TcpSocketInfo info;
+  RETURN_NOT_OK(ds.Query(socket_, &info));
+  *result = info.rx_queue_size;
+
+  return Status::OK();
+}
+
 void AcceptorPool::RunThread() {
   const int64_t kCyclesPerSecond = 
static_cast<int64_t>(base::CyclesPerSecond());
 
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 440c81903..91ebe3b56 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -70,6 +70,11 @@ class AcceptorPool {
   // Return the number of connections accepted by this messenger. Thread-safe.
   int64_t num_rpc_connections_accepted() const;
 
+  // Upon success, return Status::OK() and write the current size of the
+  // listening socket's RX queue into the 'result' out parameter. Otherwise,
+  // return corresponding status and leave the 'result' out parameter 
untouched.
+  Status GetPendingConnectionsNum(uint32_t* result) const;
+
  private:
   void RunThread();
 
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index df1046005..663a27cb3 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -46,6 +46,7 @@
 #include "kudu/security/tls_context.h"
 #include "kudu/security/token_verifier.h"
 #include "kudu/util/flags.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/socket.h"
@@ -54,6 +55,14 @@
 #include "kudu/util/thread_restrictions.h"
 #include "kudu/util/threadpool.h"
 
+METRIC_DEFINE_gauge_int32(server, rpc_pending_connections,
+                          "Pending RPC Connections",
+                          kudu::MetricUnit::kUnits,
+                          "The current size of the longest backlog of pending "
+                          "connections among all the listening sockets "
+                          "of this RPC server",
+                          kudu::MetricLevel::kInfo);
+
 using kudu::security::RpcAuthentication;
 using kudu::security::RpcEncryption;
 using std::string;
@@ -156,6 +165,23 @@ void Messenger::AllExternalReferencesDropped() {
   retain_self_.reset();
 }
 
+int32_t Messenger::GetPendingConnectionsNum() {
+  auto pool_reports_num = 0;
+  int32_t total_count = 0;
+  for (const auto& p : acceptor_pools_) {
+    uint32_t count;
+    if (auto s = p->GetPendingConnectionsNum(&count); !s.ok()) {
+      KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
+          "$0: no data on pending connections for acceptor pool at $1",
+          s.ToString(), p->bind_address().ToString()) << THROTTLE_MSG;
+      continue;
+    }
+    ++pool_reports_num;
+    total_count += static_cast<int32_t>(count);
+  }
+  return pool_reports_num == 0 ? -1 : total_count;
+}
+
 void Messenger::Shutdown() {
   ShutdownInternal(ShutdownMode::SYNC);
 }
@@ -223,12 +249,21 @@ Status Messenger::AddAcceptorPool(const Sockaddr& 
accept_addr,
   RETURN_NOT_OK(sock.Bind(accept_addr));
   Sockaddr addr;
   RETURN_NOT_OK(sock.GetSocketAddress(&addr));
-  auto acceptor_pool(std::make_shared<AcceptorPool>(
-      this, &sock, addr, acceptor_listen_backlog_));
 
-  std::lock_guard<percpu_rwlock> guard(lock_);
-  acceptor_pools_.push_back(acceptor_pool);
-  pool->swap(acceptor_pool);
+  {
+    std::lock_guard<percpu_rwlock> guard(lock_);
+    acceptor_pools_.emplace_back(std::make_shared<AcceptorPool>(
+        this, &sock, addr, acceptor_listen_backlog_));
+    *pool = acceptor_pools_.back();
+  }
+
+  // 'rpc_pending_connections' metric is instantiated only when a messenger
+  // contains at least one acceptor pool. So, this metric is instantiated
+  // only for a server-side messenger.
+  METRIC_rpc_pending_connections.InstantiateFunctionGauge(
+      metric_entity_, [this]() { return this->GetPendingConnectionsNum(); })->
+      AutoDetachToLastValue(&metric_detacher_);
+
   return Status::OK();
 }
 
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index a8044fca4..65dcfc01f 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -491,6 +491,12 @@ class Messenger {
   // any references. See 'retain_self_' for more info.
   void AllExternalReferencesDropped();
 
+  // Get the total number of currently pending connections across all the RPC
+  // endpoints this messenger is bound to. This utility method returns -1
+  // if the information on the listened socket's backlog cannot be retrieved
+  // from all of the RPC endpoints.
+  int32_t GetPendingConnectionsNum();
+
   const std::string name_;
 
   // Protects closing_, acceptor_pools_, rpc_services_.
@@ -618,6 +624,8 @@ class Messenger {
   // within a Reactor thread itself.
   std::shared_ptr<Messenger> retain_self_;
 
+  FunctionGaugeDetacher metric_detacher_;
+
   DISALLOW_COPY_AND_ASSIGN(Messenger);
 };
 
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index f90130aaf..ab9bdbb85 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -84,6 +84,7 @@ class AcceptorPool;
 
 
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
 
METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_gauge_int32(rpc_pending_connections);
 METRIC_DECLARE_histogram(acceptor_dispatch_times);
 
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_histogram(rpc_incoming_queue_time);
@@ -1457,6 +1458,35 @@ TEST_P(TestRpc, AcceptorDispatchingTimesMetric) {
   });
 }
 
+// Basic verification of the 'rpc_pending_connections' metric.
+TEST_P(TestRpc, RpcPendingConnectionsMetric) {
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  {
+    Socket socket;
+    ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0));
+    ASSERT_OK(socket.Connect(server_addr));
+  }
+
+  // Get the reference to already registered metric with the proper callback
+  // to fetch the necessary information. The { 'return -3'; } fake callback
+  // is to make sure the actual gauge returns a proper value,
+  // which is verified below.
+  auto pending_connections_gauge =
+      METRIC_rpc_pending_connections.InstantiateFunctionGauge(
+          server_messenger_->metric_entity(), []() { return -3; });
+
+  // There should be no connection pending -- the only received connection
+  // request has been handled already above. The number of pending connections
+  // is properly reported at Linux only as of now; on macOS it should report 
-1.
+#if defined(__linux__)
+  ASSERT_EQ(0, pending_connections_gauge->value());
+#else
+  ASSERT_EQ(-1, pending_connections_gauge->value());
+#endif
+}
+
 static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
                                      CountDownLatch* latch) {
   messenger->reset();

Reply via email to