This is an automated email from the ASF dual-hosted git repository. achennaka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 4648c9603 [rpc] introduce rpc_pending_connections metric 4648c9603 is described below commit 4648c96037d90438b2ac955d3f2ddfe37e260263 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Tue Jan 23 22:47:40 2024 -0800 [rpc] introduce rpc_pending_connections metric One of the prior patches introduced 'rpc_listen_socket_rx_queue_size' histogram-type metric. That one provides information on the history of the RPC listening sockets' backlog of pending connections, but this new 'rpc_pending_connections' metric reports on the current total number of pending RPC connections across all the RPC endpoints of a Kudu server. The newly introduced metric is useful for assessing the current status of the listening RPC sockets. It's also useful for post-mortem analysis using the diagnostic logs that store snapshots of this new metric tied to the timestamps when they were captured. Change-Id: I0bfdaf9047f43495df85edba3200286f743330a8 Reviewed-on: http://gerrit.cloudera.org:8080/20949 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> --- src/kudu/rpc/acceptor_pool.cc | 11 +++++++++++ src/kudu/rpc/acceptor_pool.h | 5 +++++ src/kudu/rpc/messenger.cc | 45 ++++++++++++++++++++++++++++++++++++++----- src/kudu/rpc/messenger.h | 8 ++++++++ src/kudu/rpc/rpc-test.cc | 30 +++++++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 5 deletions(-) diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc index fca0f1e85..a5038526e 100644 --- a/src/kudu/rpc/acceptor_pool.cc +++ b/src/kudu/rpc/acceptor_pool.cc @@ -220,6 +220,17 @@ int64_t AcceptorPool::num_rpc_connections_accepted() const { return rpc_connections_accepted_->value(); } +Status AcceptorPool::GetPendingConnectionsNum(uint32_t* result) const { + DiagnosticSocket ds; + RETURN_NOT_OK(ds.Init()); + + DiagnosticSocket::TcpSocketInfo info; + RETURN_NOT_OK(ds.Query(socket_, &info)); + *result = info.rx_queue_size; + + return Status::OK(); +} + void AcceptorPool::RunThread() { const int64_t kCyclesPerSecond = static_cast<int64_t>(base::CyclesPerSecond()); diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h index 440c81903..91ebe3b56 100644 --- a/src/kudu/rpc/acceptor_pool.h +++ b/src/kudu/rpc/acceptor_pool.h @@ -70,6 +70,11 @@ class AcceptorPool { // Return the number of connections accepted by this messenger. Thread-safe. int64_t num_rpc_connections_accepted() const; + // Upon success, return Status::OK() and write the current size of the + // listening socket's RX queue into the 'result' out parameter. Otherwise, + // return corresponding status and leave the 'result' out parameter untouched. + Status GetPendingConnectionsNum(uint32_t* result) const; + private: void RunThread(); diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc index df1046005..663a27cb3 100644 --- a/src/kudu/rpc/messenger.cc +++ b/src/kudu/rpc/messenger.cc @@ -46,6 +46,7 @@ #include "kudu/security/tls_context.h" #include "kudu/security/token_verifier.h" #include "kudu/util/flags.h" +#include "kudu/util/logging.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/socket.h" @@ -54,6 +55,14 @@ #include "kudu/util/thread_restrictions.h" #include "kudu/util/threadpool.h" +METRIC_DEFINE_gauge_int32(server, rpc_pending_connections, + "Pending RPC Connections", + kudu::MetricUnit::kUnits, + "The current size of the longest backlog of pending " + "connections among all the listening sockets " + "of this RPC server", + kudu::MetricLevel::kInfo); + using kudu::security::RpcAuthentication; using kudu::security::RpcEncryption; using std::string; @@ -156,6 +165,23 @@ void Messenger::AllExternalReferencesDropped() { retain_self_.reset(); } +int32_t Messenger::GetPendingConnectionsNum() { + auto pool_reports_num = 0; + int32_t total_count = 0; + for (const auto& p : acceptor_pools_) { + uint32_t count; + if (auto s = p->GetPendingConnectionsNum(&count); !s.ok()) { + KLOG_EVERY_N_SECS(WARNING, 60) << Substitute( + "$0: no data on pending connections for acceptor pool at $1", + s.ToString(), p->bind_address().ToString()) << THROTTLE_MSG; + continue; + } + ++pool_reports_num; + total_count += static_cast<int32_t>(count); + } + return pool_reports_num == 0 ? -1 : total_count; +} + void Messenger::Shutdown() { ShutdownInternal(ShutdownMode::SYNC); } @@ -223,12 +249,21 @@ Status Messenger::AddAcceptorPool(const Sockaddr& accept_addr, RETURN_NOT_OK(sock.Bind(accept_addr)); Sockaddr addr; RETURN_NOT_OK(sock.GetSocketAddress(&addr)); - auto acceptor_pool(std::make_shared<AcceptorPool>( - this, &sock, addr, acceptor_listen_backlog_)); - std::lock_guard<percpu_rwlock> guard(lock_); - acceptor_pools_.push_back(acceptor_pool); - pool->swap(acceptor_pool); + { + std::lock_guard<percpu_rwlock> guard(lock_); + acceptor_pools_.emplace_back(std::make_shared<AcceptorPool>( + this, &sock, addr, acceptor_listen_backlog_)); + *pool = acceptor_pools_.back(); + } + + // 'rpc_pending_connections' metric is instantiated only when a messenger + // contains at least one acceptor pool. So, this metric is instantiated + // only for a server-side messenger. + METRIC_rpc_pending_connections.InstantiateFunctionGauge( + metric_entity_, [this]() { return this->GetPendingConnectionsNum(); })-> + AutoDetachToLastValue(&metric_detacher_); + return Status::OK(); } diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h index a8044fca4..65dcfc01f 100644 --- a/src/kudu/rpc/messenger.h +++ b/src/kudu/rpc/messenger.h @@ -491,6 +491,12 @@ class Messenger { // any references. See 'retain_self_' for more info. void AllExternalReferencesDropped(); + // Get the total number of currently pending connections across all the RPC + // endpoints this messenger is bound to. This utility method returns -1 + // if the information on the listened socket's backlog cannot be retrieved + // from all of the RPC endpoints. + int32_t GetPendingConnectionsNum(); + const std::string name_; // Protects closing_, acceptor_pools_, rpc_services_. @@ -618,6 +624,8 @@ class Messenger { // within a Reactor thread itself. std::shared_ptr<Messenger> retain_self_; + FunctionGaugeDetacher metric_detacher_; + DISALLOW_COPY_AND_ASSIGN(Messenger); }; diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index f90130aaf..ab9bdbb85 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -84,6 +84,7 @@ class AcceptorPool; METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep); METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep); +METRIC_DECLARE_gauge_int32(rpc_pending_connections); METRIC_DECLARE_histogram(acceptor_dispatch_times); METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep); METRIC_DECLARE_histogram(rpc_incoming_queue_time); @@ -1457,6 +1458,35 @@ TEST_P(TestRpc, AcceptorDispatchingTimesMetric) { }); } +// Basic verification of the 'rpc_pending_connections' metric. +TEST_P(TestRpc, RpcPendingConnectionsMetric) { + Sockaddr server_addr; + ASSERT_OK(StartTestServer(&server_addr)); + + { + Socket socket; + ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0)); + ASSERT_OK(socket.Connect(server_addr)); + } + + // Get the reference to already registered metric with the proper callback + // to fetch the necessary information. The { 'return -3'; } fake callback + // is to make sure the actual gauge returns a proper value, + // which is verified below. + auto pending_connections_gauge = + METRIC_rpc_pending_connections.InstantiateFunctionGauge( + server_messenger_->metric_entity(), []() { return -3; }); + + // There should be no connection pending -- the only received connection + // request has been handled already above. The number of pending connections + // is properly reported at Linux only as of now; on macOS it should report -1. +#if defined(__linux__) + ASSERT_EQ(0, pending_connections_gauge->value()); +#else + ASSERT_EQ(-1, pending_connections_gauge->value()); +#endif +} + static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger, CountDownLatch* latch) { messenger->reset();