This is an automated email from the ASF dual-hosted git repository.

achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 19bade3d6 [rpc] modernize code a bit
19bade3d6 is described below

commit 19bade3d6ed8422591d32f48df64d3b4cdf5dc55
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Fri Dec 15 19:57:38 2023 -0800

    [rpc] modernize code a bit
    
    Since I'm updating the code in src/kudurpc/connection.{h,cc} and
    src/kudu/util/net/socket.{h,cc} in follow-up changelists, I went ahead
    and updated the code to be style-compliant, modernized it, and did
    other unsorted improvements before introducing the new functionality.
    
    Change-Id: I2634591426c3e107e4d11b661ff62e5cde8a7570
    Reviewed-on: http://gerrit.cloudera.org:8080/20815
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Mahesh Reddy <mre...@cloudera.com>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
---
 src/kudu/rpc/connection.cc  | 288 ++++++++++++++++++++++++--------------------
 src/kudu/rpc/connection.h   |  27 ++---
 src/kudu/rpc/messenger.cc   |  18 ++-
 src/kudu/rpc/messenger.h    |   6 +-
 src/kudu/rpc/proxy.cc       |  54 +++++----
 src/kudu/rpc/reactor.cc     |  23 ++--
 src/kudu/rpc/reactor.h      |   8 +-
 src/kudu/util/net/socket.cc |  94 ++++++++++-----
 src/kudu/util/net/socket.h  |  25 ++--
 9 files changed, 303 insertions(+), 240 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index ddf8dd3e5..c6727b805 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -19,10 +19,14 @@
 
 #include <netinet/in.h>
 #include <netinet/tcp.h>
-#include <string.h>
+#include <sys/socket.h>
+#ifdef __linux__
+#include <sys/ioctl.h>
+#endif
 
 #include <algorithm>
 #include <cerrno>
+#include <cstddef>
 #include <iostream>
 #include <memory>
 #include <set>
@@ -53,15 +57,11 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
-#include <sys/socket.h>
-#ifdef __linux__
-#include <sys/ioctl.h>
-#endif
-
 using kudu::security::TlsSocket;
 using std::includes;
 using std::set;
 using std::shared_ptr;
+using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 
@@ -199,8 +199,8 @@ struct tcp_info {
 ///
 /// Connection
 ///
-Connection::Connection(ReactorThread *reactor_thread,
-                       Sockaddr remote,
+Connection::Connection(ReactorThread* reactor_thread,
+                       const Sockaddr& remote,
                        unique_ptr<Socket> socket,
                        Direction direction,
                        CredentialsPolicy policy)
@@ -231,7 +231,7 @@ Status Connection::SetTcpKeepAlive(int idle_time_s, int 
retry_time_s, int num_re
 
 void Connection::EpollRegister(ev::loop_ref& loop) {
   DCHECK(reactor_thread_->IsCurrentThread());
-  DVLOG(4) << "Registering connection for epoll: " << ToString();
+  DVLOG(4) << Substitute("registering connection for epoll: $0", ToString());
   write_io_.set(loop);
   write_io_.set(socket_->GetFd(), ev::WRITE);
   write_io_.set<Connection, &Connection::WriteHandler>(this);
@@ -259,7 +259,7 @@ Connection::~Connection() {
 bool Connection::Idle() const {
   DCHECK(reactor_thread_->IsCurrentThread());
   // check if we're in the middle of receiving something
-  InboundTransfer *transfer = inbound_.get();
+  InboundTransfer* transfer = inbound_.get();
   if (transfer && (transfer->TransferStarted())) {
     return false;
   }
@@ -284,7 +284,7 @@ bool Connection::Idle() const {
   return true;
 }
 
-void Connection::Shutdown(const Status &status,
+void Connection::Shutdown(const Status& status,
                           unique_ptr<ErrorStatusPB> rpc_error) {
   DCHECK(reactor_thread_->IsCurrentThread());
   shutdown_status_ = status.CloneAndPrepend("RPC connection failed");
@@ -292,16 +292,17 @@ void Connection::Shutdown(const Status &status,
   if (inbound_ && inbound_->TransferStarted()) {
     double secs_since_active =
         (reactor_thread_->cur_time() - last_activity_time_).ToSeconds();
-    LOG(WARNING) << "Shutting down " << ToString()
-                 << " with pending inbound data ("
-                 << inbound_->StatusAsString() << ", last active "
-                 << HumanReadableElapsedTime::ToShortString(secs_since_active)
-                 << " ago, status=" << status.ToString() << ")";
+    LOG(WARNING) << Substitute(
+        "shutting down $0 with pending inbound data: "
+        "$1; last active $2 ago: status $3",
+        ToString(),
+        inbound_->StatusAsString(),
+        HumanReadableElapsedTime::ToShortString(secs_since_active),
+        status.ToString());
   }
 
   // Clear any calls which have been sent and were awaiting a response.
-  for (const car_map_t::value_type &v : awaiting_response_) {
-    CallAwaitingResponse *c = v.second;
+  for (const auto& [_, c] : awaiting_response_) {
     if (c->call) {
       // Make sure every awaiting call receives the error info, if any.
       unique_ptr<ErrorStatusPB> error;
@@ -320,7 +321,7 @@ void Connection::Shutdown(const Status &status,
 
   // Clear any outbound transfers.
   while (!outbound_transfers_.empty()) {
-    OutboundTransfer *t = &outbound_transfers_.front();
+    auto* t = &outbound_transfers_.front();
     outbound_transfers_.pop_front();
     delete t;
   }
@@ -336,14 +337,14 @@ void Connection::Shutdown(const Status &status,
 void Connection::QueueOutbound(unique_ptr<OutboundTransfer> transfer) {
   DCHECK(reactor_thread_->IsCurrentThread());
 
-  if (!shutdown_status_.ok()) {
+  if (PREDICT_FALSE(!shutdown_status_.ok())) {
     // If we've already shut down, then we just need to abort the
     // transfer rather than bothering to queue it.
     transfer->Abort(shutdown_status_);
     return;
   }
 
-  DVLOG(3) << "Queueing transfer: " << transfer->HexDump();
+  DVLOG(3) << Substitute("queueing transfer: $0", transfer->HexDump());
 
   outbound_transfers_.push_back(*transfer.release());
 
@@ -360,14 +361,16 @@ Connection::CallAwaitingResponse::~CallAwaitingResponse() 
{
   DCHECK(conn->reactor_thread_->IsCurrentThread());
 }
 
-void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int 
revents) {
+void Connection::CallAwaitingResponse::HandleTimeout(ev::timer& watcher,
+                                                     int /*revents*/) {
   if (remaining_timeout > 0) {
-    if (watcher.remaining() < -1.0) {
-      LOG(WARNING) << "RPC call timeout handler was delayed by "
-                   << -watcher.remaining() << "s! This may be due to a 
process-wide "
-                   << "pause such as swapping, logging-related delays, or 
allocator lock "
-                   << "contention. Will allow an additional "
-                   << remaining_timeout << "s for a response.";
+    const auto rem = watcher.remaining();
+    if (PREDICT_FALSE(rem < -1.0)) {
+      LOG(WARNING) << Substitute(
+          "RPC call timeout handler was delayed by $0s: this may be due "
+          "to a process-wide pause such as swapping, logging-related delays, "
+          "or allocator lock contention. Will allow extra $1s for a response",
+          rem, remaining_timeout);
     }
 
     watcher.set(remaining_timeout, 0);
@@ -379,7 +382,7 @@ void 
Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int rev
   conn->HandleOutboundCallTimeout(this);
 }
 
-void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
+void Connection::HandleOutboundCallTimeout(CallAwaitingResponse* car) {
   DCHECK(reactor_thread_->IsCurrentThread());
   if (!car->call) {
     // The RPC may have been cancelled before the timeout was hit.
@@ -407,7 +410,7 @@ void 
Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   // already timed out.
 }
 
-void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
+void Connection::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
   CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, 
call->call_id());
   if (car != nullptr) {
     // car->call may be NULL if the call has timed out already.
@@ -423,7 +426,7 @@ Status Connection::GetLocalAddress(Sockaddr* addr) const {
 }
 
 // Inject a cancellation when 'call' is in state 
'FLAGS_rpc_inject_cancellation_state'.
-void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> 
&call) {
+void inline Connection::MaybeInjectCancellation(const 
shared_ptr<OutboundCall>& call) {
   if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
     reactor_thread_->reactor()->messenger()->QueueCancellation(call);
   }
@@ -435,7 +438,7 @@ void inline Connection::MaybeInjectCancellation(const 
shared_ptr<OutboundCall> &
 struct CallTransferCallbacks : public TransferCallbacks {
  public:
   explicit CallTransferCallbacks(shared_ptr<OutboundCall> call,
-                                 Connection *conn)
+                                 Connection* conn)
       : call_(std::move(call)), conn_(conn) {}
 
   void NotifyTransferFinished() override {
@@ -452,9 +455,9 @@ struct CallTransferCallbacks : public TransferCallbacks {
     delete this;
   }
 
-  void NotifyTransferAborted(const Status &status) override {
-    VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: "
-            << status.ToString();
+  void NotifyTransferAborted(const Status& status) override {
+    VLOG(1) << Substitute(
+        "transfer of $0 aborted: $1", call_->ToString(), status.ToString());
     delete this;
   }
 
@@ -484,7 +487,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> 
call) {
   DCHECK(!call->cancellation_requested());
 
   // Assign the call ID.
-  int32_t call_id = GetNextCallId();
+  const int32_t call_id = GetNextCallId();
   call->set_call_id(call_id);
 
   // Serialize the actual bytes to be put on the wire.
@@ -501,7 +504,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> 
call) {
   car->call = call;
 
   // Set up the timeout timer.
-  const MonoDelta &timeout = call->controller()->timeout();
+  const auto& timeout = call->controller()->timeout();
   if (timeout.Initialized()) {
     reactor_thread_->RegisterTimeout(&car->timeout_timer);
     car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*)
@@ -544,7 +547,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> 
call) {
     car->timeout_timer.start();
   }
 
-  TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this);
+  TransferCallbacks* cb = new CallTransferCallbacks(std::move(call), this);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(unique_ptr<OutboundTransfer>(
       OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, cb)));
@@ -553,18 +556,17 @@ void 
Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) {
 // Callbacks for sending an RPC call response from the server.
 // This takes ownership of the InboundCall object so that, once it has
 // been responded to, we can free up all of the associated memory.
-struct ResponseTransferCallbacks : public TransferCallbacks {
+struct ResponseTransferCallbacks final : public TransferCallbacks {
  public:
-  ResponseTransferCallbacks(unique_ptr<InboundCall> call,
-                            Connection *conn) :
-    call_(std::move(call)),
-    conn_(conn)
-  {}
+  ResponseTransferCallbacks(unique_ptr<InboundCall> call, Connection* conn)
+      : call_(std::move(call)),
+        conn_(conn) {
+  }
 
-  ~ResponseTransferCallbacks() {
+  ~ResponseTransferCallbacks() override {
     // Remove the call from the map.
-    InboundCall *call_from_map = EraseKeyReturnValuePtr(
-      &conn_->calls_being_handled_, call_->call_id());
+    auto* call_from_map = EraseKeyReturnValuePtr(
+        &conn_->calls_being_handled_, call_->call_id());
     DCHECK_EQ(call_from_map, call_.get());
   }
 
@@ -573,38 +575,38 @@ struct ResponseTransferCallbacks : public 
TransferCallbacks {
   }
 
   void NotifyTransferAborted(const Status& /*status*/) override {
-    LOG(WARNING) << "Connection torn down before " <<
-      call_->ToString() << " could send its response";
+    LOG(WARNING) << Substitute(
+        "$0 torn down before $1 could send its response",
+        conn_->ToString(), call_->ToString());
     delete this;
   }
 
  private:
   unique_ptr<InboundCall> call_;
-  Connection *conn_;
+  Connection* conn_;
 };
 
 // Reactor task which puts a transfer on the outbound transfer queue.
 class QueueTransferTask : public ReactorTask {
  public:
-  QueueTransferTask(unique_ptr<OutboundTransfer> transfer,
-                    Connection *conn)
-    : transfer_(std::move(transfer)),
-      conn_(conn)
-  {}
+  QueueTransferTask(unique_ptr<OutboundTransfer> transfer, Connection* conn)
+      : transfer_(std::move(transfer)),
+        conn_(conn) {
+  }
 
   void Run(ReactorThread* /*thr*/) override {
     conn_->QueueOutbound(std::move(transfer_));
     delete this;
   }
 
-  void Abort(const Status &status) override {
+  void Abort(const Status& status) override {
     transfer_->Abort(status);
     delete this;
   }
 
  private:
   unique_ptr<OutboundTransfer> transfer_;
-  Connection *conn_;
+  Connection* conn_;
 };
 
 void Connection::QueueResponseForCall(unique_ptr<InboundCall> call) {
@@ -621,15 +623,15 @@ void 
Connection::QueueResponseForCall(unique_ptr<InboundCall> call) {
   TransferPayload tmp_slices;
   call->SerializeResponseTo(&tmp_slices);
 
-  TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
+  TransferCallbacks* cb = new ResponseTransferCallbacks(std::move(call), this);
   // After the response is sent, can delete the InboundCall object.
   // We set a dummy call ID and required feature set, since these are not 
needed
   // when sending responses.
   unique_ptr<OutboundTransfer> t(
       OutboundTransfer::CreateForCallResponse(tmp_slices, cb));
 
-  QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
-  reactor_thread_->reactor()->ScheduleReactorTask(task);
+  reactor_thread_->reactor()->ScheduleReactorTask(
+      new QueueTransferTask(std::move(t), this));
 }
 
 void Connection::set_confidential(bool is_confidential) {
@@ -646,10 +648,10 @@ RpczStore* Connection::rpcz_store() {
   return reactor_thread_->reactor()->messenger()->rpcz_store();
 }
 
-void Connection::ReadHandler(ev::io &watcher, int revents) {
+void Connection::ReadHandler(ev::io& /*watcher*/, int revents) {
   DCHECK(reactor_thread_->IsCurrentThread());
 
-  DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")";
+  DVLOG(3) << Substitute("$0 ReadHandler(revents=$1)", ToString(), revents);
   if (revents & EV_ERROR) {
     reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
                                      ": ReadHandler encountered an error"));
@@ -665,25 +667,34 @@ void Connection::ReadHandler(ev::io &watcher, int 
revents) {
     Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
     if (PREDICT_FALSE(!status.ok())) {
       if (status.posix_code() == ESHUTDOWN) {
-        VLOG(1) << ToString() << " shut down by remote end.";
+        VLOG(1) << Substitute("$0 shut down by remote end", ToString());
       } else {
-        LOG(WARNING) << ToString() << " recv error: " << status.ToString();
+        LOG(WARNING) << Substitute("$0 recv error: $1",
+                                   ToString(), status.ToString());
       }
       reactor_thread_->DestroyConnection(this, status);
       return;
     }
     if (!inbound_->TransferFinished()) {
-      DVLOG(3) << ToString() << ": read is not yet finished yet.";
+      DVLOG(3) << Substitute("$0: read is not yet finished yet", ToString());
       return;
     }
-    DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() 
<< " bytes";
-
-    if (direction_ == CLIENT) {
-      HandleCallResponse(std::move(inbound_));
-    } else if (direction_ == SERVER) {
-      HandleIncomingCall(std::move(inbound_));
-    } else {
-      LOG(FATAL) << "Invalid direction: " << direction_;
+    DVLOG(3) << Substitute("$0: finished reading $1 bytes",
+                           ToString(), inbound_->data().size());
+
+    switch (direction_) {
+      case CLIENT:
+        HandleCallResponse(std::move(inbound_));
+        break;
+
+      case SERVER:
+        HandleIncomingCall(std::move(inbound_));
+        break;
+
+      default:
+        LOG(DFATAL) << Substitute("$0: invalid direction",
+                                  static_cast<uint16_t>(direction_));
+        break;
     }
 
     if (extra_buf.size() > 0) {
@@ -699,19 +710,24 @@ void 
Connection::HandleIncomingCall(unique_ptr<InboundTransfer> transfer) {
 
   unique_ptr<InboundCall> call(new InboundCall(this));
   Status s = call->ParseFrom(std::move(transfer));
-  if (!s.ok()) {
-    LOG(WARNING) << ToString() << ": received bad data: " << s.ToString();
-    // TODO: shutdown? probably, since any future stuff on this socket will be
-    // "unsynchronized"
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(WARNING) << Substitute("$0: received bad data: '$1'",
+                             ToString(), s.ToString());
+    // Shutting down down the connection since there is a high risk of 
receiving
+    // "unsynchronized" data on this socket after this error.
+    Shutdown(s);
     return;
   }
 
-  if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) 
{
-    LOG(WARNING) << ToString() << ": received call ID " << call->call_id() <<
-      " but was already processing this ID! Ignoring";
+  if (PREDICT_FALSE(!InsertIfNotPresent(&calls_being_handled_,
+                                        call->call_id(),
+                                        call.get()))) {
+    LOG(WARNING) << Substitute(
+        "$0: received call ID $1 but was already processing this ID, ignoring",
+        ToString(), call->call_id());
     reactor_thread_->DestroyConnection(
-      this, Status::RuntimeError("Received duplicate call id",
-                                 Substitute("$0", call->call_id())));
+        this, Status::RuntimeError("Received duplicate call id",
+                                   Substitute("$0", call->call_id())));
     return;
   }
 
@@ -723,11 +739,12 @@ void 
Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) {
   unique_ptr<CallResponse> resp(new CallResponse);
   CHECK_OK(resp->ParseFrom(std::move(transfer)));
 
-  CallAwaitingResponse *car_ptr =
-    EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
+  CallAwaitingResponse* car_ptr = EraseKeyReturnValuePtr(
+      &awaiting_response_, resp->call_id());
   if (PREDICT_FALSE(car_ptr == nullptr)) {
-    LOG(WARNING) << ToString() << ": Got a response for call id " << 
resp->call_id() << " which "
-                 << "was not pending! Ignoring.";
+    LOG(WARNING) << Substitute(
+        "$0: got a response for call id $1 which was not pending, ignoring",
+        ToString(), resp->call_id());
     return;
   }
 
@@ -736,8 +753,9 @@ void 
Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) {
 
   if (PREDICT_FALSE(!car->call)) {
     // The call already failed due to a timeout.
-    VLOG(1) << "Got response to call id " << resp->call_id() << " after client 
"
-            << "already timed out or cancelled";
+    VLOG(1) << Substitute(
+        "got response to call id $0 after client already timed out or 
cancelled",
+         resp->call_id());
     return;
   }
 
@@ -747,7 +765,7 @@ void 
Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) {
   MaybeInjectCancellation(car->call);
 }
 
-void Connection::WriteHandler(ev::io &watcher, int revents) {
+void Connection::WriteHandler(ev::io& /*watcher*/, int revents) {
   DCHECK(reactor_thread_->IsCurrentThread());
 
   if (revents & EV_ERROR) {
@@ -755,11 +773,12 @@ void Connection::WriteHandler(ev::io &watcher, int 
revents) {
           ": writeHandler encountered an error"));
     return;
   }
-  DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
+  DVLOG(3) << Substitute("$0: writeHandler: revents=$1", ToString(), revents);
 
   if (outbound_transfers_.empty()) {
-    LOG(WARNING) << ToString() << " got a ready-to-write callback, but there 
is "
-      "nothing to write.";
+    LOG(WARNING) << Substitute(
+        "$0 got a ready-to-write callback, but there is nothing to write",
+        ToString());
     write_io_.stop();
     return;
   }
@@ -770,7 +789,7 @@ void Connection::WriteHandler(ev::io &watcher, int revents) 
{
 
 Connection::ProcessOutboundTransfersResult 
Connection::ProcessOutboundTransfers() {
   while (!outbound_transfers_.empty()) {
-    OutboundTransfer* transfer = &(outbound_transfers_.front());
+    OutboundTransfer* transfer = &outbound_transfers_.front();
 
     if (!transfer->TransferStarted()) {
       if (transfer->is_for_outbound_call()) {
@@ -813,13 +832,14 @@ Connection::ProcessOutboundTransfersResult 
Connection::ProcessOutboundTransfers(
     last_activity_time_ = reactor_thread_->cur_time();
     Status status = transfer->SendBuffer(socket_.get());
     if (PREDICT_FALSE(!status.ok())) {
-      LOG(WARNING) << ToString() << " send error: " << status.ToString();
+      LOG(WARNING) << Substitute(
+          "$0 send error: $1", ToString(), status.ToString());
       reactor_thread_->DestroyConnection(this, status);
       return kConnectionDestroyed;
     }
 
     if (!transfer->TransferFinished()) {
-      DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
+      DVLOG(3) << Substitute("$0: writeHandler: xfer not finished", 
ToString());
       return kMoreToSend;
     }
 
@@ -830,14 +850,14 @@ Connection::ProcessOutboundTransfersResult 
Connection::ProcessOutboundTransfers(
   return kNoMoreToSend;
 }
 
-std::string Connection::ToString() const {
+string Connection::ToString() const {
   // This may be called from other threads, so we cannot
   // include anything in the output about the current state,
   // which might concurrently change from another thread.
-  return strings::Substitute(
-    "$0 $1",
-    direction_ == SERVER ? "server connection from" : "client connection to",
-    remote_.ToString());
+  return Substitute("$0 $1",
+                    direction_ == SERVER
+                        ? "server connection from"
+                        : "client connection to", remote_.ToString());
 }
 
 // Reactor task that transitions this Connection from connection negotiation to
@@ -847,22 +867,21 @@ class NegotiationCompletedTask : public ReactorTask {
   NegotiationCompletedTask(Connection* conn,
                            Status negotiation_status,
                            std::unique_ptr<ErrorStatusPB> rpc_error)
-    : conn_(conn),
-      negotiation_status_(std::move(negotiation_status)),
-      rpc_error_(std::move(rpc_error)) {
+      : conn_(conn),
+        negotiation_status_(std::move(negotiation_status)),
+        rpc_error_(std::move(rpc_error)) {
   }
 
-  void Run(ReactorThread *rthread) override {
+  void Run(ReactorThread* rthread) override {
     rthread->CompleteConnectionNegotiation(conn_,
                                            negotiation_status_,
                                            std::move(rpc_error_));
     delete this;
   }
 
-  void Abort(const Status &status) override {
+  void Abort(const Status& status) override {
     DCHECK(conn_->reactor_thread()->reactor()->closing());
-    VLOG(1) << "Failed connection negotiation due to shut down reactor thread: 
"
-            << status.ToString();
+    VLOG(1) << Substitute("connection negotiation aborted: $0", 
status.ToString());
     delete this;
   }
 
@@ -885,7 +904,7 @@ void Connection::MarkNegotiationComplete() {
 }
 
 Status Connection::DumpPB(const DumpConnectionsRequestPB& req,
-                          RpcConnectionPB* resp) {
+                          RpcConnectionPB* resp) const {
   DCHECK(reactor_thread_->IsCurrentThread());
   resp->set_remote_ip(remote_.ToString());
   if (negotiation_complete_) {
@@ -894,27 +913,31 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB& 
req,
     resp->set_state(RpcConnectionPB::NEGOTIATING);
   }
 
-  if (direction_ == CLIENT) {
-    for (const car_map_t::value_type& entry : awaiting_response_) {
-      CallAwaitingResponse* c = entry.second;
-      if (c->call) {
-        c->call->DumpPB(req, resp->add_calls_in_flight());
+  switch (direction_) {
+    case CLIENT:
+      for (const auto& [_, c]: awaiting_response_) {
+        if (c->call) {
+          c->call->DumpPB(req, resp->add_calls_in_flight());
+        }
       }
-    }
+      resp->set_outbound_queue_size(outbound_transfers_.size());
+      break;
 
-    resp->set_outbound_queue_size(num_queued_outbound_transfers());
-  } else if (direction_ == SERVER) {
-    if (negotiation_complete_) {
-      // It's racy to dump credentials while negotiating, since the Connection
-      // object is owned by the negotiation thread at that point.
-      resp->set_remote_user_credentials(remote_user_.ToString());
-    }
-    for (const inbound_call_map_t::value_type& entry : calls_being_handled_) {
-      InboundCall* c = entry.second;
-      c->DumpPB(req, resp->add_calls_in_flight());
-    }
-  } else {
-    LOG(FATAL);
+    case SERVER:
+      if (negotiation_complete_) {
+        // It's racy to dump credentials while negotiating, since the 
Connection
+        // object is owned by the negotiation thread at that point.
+        resp->set_remote_user_credentials(remote_user_.ToString());
+      }
+      for (const auto& [_, c]: calls_being_handled_) {
+        c->DumpPB(req, resp->add_calls_in_flight());
+      }
+      break;
+
+    default:
+      LOG(DFATAL) << Substitute("$0: invalid direction",
+                                static_cast<uint16_t>(direction_));
+      break;
   }
 #ifdef __linux__
   if (negotiation_complete_ && remote_.is_ip()) {
@@ -937,11 +960,10 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB& 
req,
 Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const {
   DCHECK(reactor_thread_->IsCurrentThread());
   int fd = socket_->GetFd();
-  CHECK_GE(fd, 0);
+  DCHECK_GE(fd, 0);
 
   // Fetch TCP_INFO statistics from the kernel.
-  tcp_info ti;
-  memset(&ti, 0, sizeof(ti));
+  tcp_info ti = {};
   socklen_t len = sizeof(ti);
   int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len);
   if (rc == 0) {
@@ -1026,12 +1048,12 @@ Status 
Connection::GetTransportDetailsPB(TransportDetailsPB* pb) const {
     tls->set_cipher_suite(tls_socket->GetCipherDescription());
   }
 
-  int fd = socket_->GetFd();
-  CHECK_GE(fd, 0);
+  const int fd = socket_->GetFd();
+  DCHECK_GE(fd, 0);
   int32_t max_seg_size = 0;
   socklen_t optlen = sizeof(max_seg_size);
   int ret = ::getsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen);
-  if (ret) {
+  if (PREDICT_FALSE(ret)) {
     int err = errno;
     return Status::NetworkError(
         "getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err);
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index d0afbe536..eab48f54e 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -16,7 +16,6 @@
 // under the License.
 #pragma once
 
-#include <cstddef>
 #include <cstdint>
 #include <limits>
 #include <memory>
@@ -89,8 +88,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // remote: the address of the remote end
   // socket: the socket to take ownership of.
   // direction: whether we are the client or server side
-  Connection(ReactorThread *reactor_thread,
-             Sockaddr remote,
+  Connection(ReactorThread* reactor_thread,
+             const Sockaddr& remote,
              std::unique_ptr<Socket> socket,
              Direction direction,
              CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS);
@@ -138,7 +137,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
 
   // Cancel an outbound call by removing any reference to it by 
CallAwaitingResponse
   // in 'awaiting_responses_'.
-  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall>& call);
 
   // The address of the remote end of the connection.
   const Sockaddr& remote() const { return remote_; }
@@ -184,10 +183,10 @@ class Connection : public 
RefCountedThreadSafe<Connection> {
   RpczStore* rpcz_store();
 
   // libev callback when data is available to read.
-  void ReadHandler(ev::io &watcher, int revents);
+  void ReadHandler(ev::io& watcher, int revents); // 
NOLINT(google-runtime-references)
 
   // libev callback when we may write to the socket.
-  void WriteHandler(ev::io &watcher, int revents);
+  void WriteHandler(ev::io& watcher, int revents);// 
NOLINT(google-runtime-references)
 
   enum ProcessOutboundTransfersResult {
     // All of the transfers in the queue have been sent successfully.
@@ -224,7 +223,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
   void MarkNegotiationComplete();
 
   Status DumpPB(const DumpConnectionsRequestPB& req,
-                RpcConnectionPB* resp);
+                RpcConnectionPB* resp) const;
 
   ReactorThread* reactor_thread() const { return reactor_thread_; }
 
@@ -263,10 +262,6 @@ class Connection : public RefCountedThreadSafe<Connection> 
{
     scheduled_for_shutdown_ = true;
   }
 
-  size_t num_queued_outbound_transfers() const {
-    return outbound_transfers_.size();
-  }
-
  private:
   friend struct CallAwaitingResponse;
   friend class QueueTransferTask;
@@ -279,9 +274,9 @@ class Connection : public RefCountedThreadSafe<Connection> {
     ~CallAwaitingResponse();
 
     // Notification from libev that the call has timed out.
-    void HandleTimeout(ev::timer &watcher, int revents);
+    void HandleTimeout(ev::timer& watcher, int revents);  // 
NOLINT(google-runtime-references)
 
-    Connection *conn;
+    Connection* conn;
     std::shared_ptr<OutboundCall> call;
     ev::timer timeout_timer;
 
@@ -312,7 +307,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
 
   // The given CallAwaitingResponse has elapsed its user-defined timeout.
   // Set it to Failed.
-  void HandleOutboundCallTimeout(CallAwaitingResponse *car);
+  void HandleOutboundCallTimeout(CallAwaitingResponse* car);
 
   // Queue a transfer for sending on this connection.
   // We will take ownership of the transfer.
@@ -321,7 +316,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
 
   // Internal test function for injecting cancellation request when 'call'
   // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
-  void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
+  void MaybeInjectCancellation(const std::shared_ptr<OutboundCall>& call);
 
   Status GetSocketStatsPB(SocketStatsPB* pb) const;
 
@@ -344,7 +339,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
   RemoteUser remote_user_;
 
   // whether we are client or server
-  Direction direction_;
+  const Direction direction_;
 
   // The last time we read or wrote from the socket.
   MonoTime last_activity_time_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 6820490b3..df1046005 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -269,9 +269,8 @@ Status Messenger::UnregisterService(const string& 
service_name) {
   return Status::OK();
 }
 
-void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
-  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
-  reactor->QueueOutboundCall(call);
+void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
+  RemoteToReactor(call->conn_id().remote())->QueueOutboundCall(call);
 }
 
 void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) {
@@ -304,17 +303,16 @@ void Messenger::QueueInboundCall(unique_ptr<InboundCall> 
call) {
   WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle 
RPC call");
 }
 
-void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
-  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+void Messenger::QueueCancellation(const shared_ptr<OutboundCall>& call) {
+  Reactor* reactor = RemoteToReactor(call->conn_id().remote());
   reactor->QueueCancellation(call);
 }
 
-void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr 
&remote) {
-  Reactor *reactor = RemoteToReactor(remote);
-  reactor->RegisterInboundSocket(new_socket, remote);
+void Messenger::RegisterInboundSocket(Socket* new_socket, const Sockaddr& 
remote) {
+  RemoteToReactor(remote)->RegisterInboundSocket(new_socket, remote);
 }
 
-Messenger::Messenger(const MessengerBuilder &bld)
+Messenger::Messenger(const MessengerBuilder& bld)
     : name_(bld.name_),
       state_(kStarted),
       authentication_(RpcAuthentication::REQUIRED),
@@ -352,7 +350,7 @@ Messenger::~Messenger() {
   STLDeleteElements(&reactors_);
 }
 
-Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) {
+Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) const {
   // This is just a static partitioning; we could get a lot
   // fancier with assigning Sockaddrs to Reactors.
   return reactors_[remote.HashCode() % reactors_.size()];
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 753681d46..a8044fca4 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -203,7 +203,7 @@ class MessengerBuilder {
   // list and
   // https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_ciphersuites.html
   // for SSL_CTX_set_ciphersuites() API details.
-  MessengerBuilder &set_rpc_tls_ciphersuites(
+  MessengerBuilder& set_rpc_tls_ciphersuites(
       const std::string& rpc_tls_ciphersuites) {
     rpc_tls_ciphersuites_ = rpc_tls_ciphersuites;
     return *this;
@@ -211,7 +211,7 @@ class MessengerBuilder {
 
   // Set the minimum protocol version to allow when for securing RPC 
connections
   // with TLS. May be one of 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'.
-  MessengerBuilder &set_rpc_tls_min_protocol(
+  MessengerBuilder& set_rpc_tls_min_protocol(
       const std::string& rpc_tls_min_protocol) {
     rpc_tls_min_protocol_ = rpc_tls_min_protocol;
     return *this;
@@ -473,7 +473,7 @@ class Messenger {
 
   explicit Messenger(const MessengerBuilder& bld);
 
-  Reactor* RemoteToReactor(const Sockaddr& remote);
+  Reactor* RemoteToReactor(const Sockaddr& remote) const;
   Status Init();
   void RunTimeoutThread();
   void UpdateCurTime();
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 9501a388a..b6b355960 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -20,6 +20,7 @@
 #include <functional>
 #include <iostream>
 #include <memory>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -27,7 +28,7 @@
 
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/messenger.h" // IWYU pragma: keep
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/response_callback.h"
@@ -43,6 +44,7 @@
 #include "kudu/util/user.h"
 
 using google::protobuf::Message;
+using std::make_shared;
 using std::string;
 using std::shared_ptr;
 using std::unique_ptr;
@@ -52,7 +54,7 @@ using strings::Substitute;
 namespace kudu {
 namespace rpc {
 
-Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+Proxy::Proxy(shared_ptr<Messenger> messenger,
              const Sockaddr& remote,
              string hostname,
              string service_name)
@@ -60,16 +62,16 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
       dns_resolver_(nullptr),
       messenger_(std::move(messenger)),
       is_started_(false) {
-  CHECK(messenger_ != nullptr);
+  DCHECK(messenger_);
   DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
   DCHECK(remote.is_initialized());
   // By default, we set the real user to the currently logged-in user.
   // Effective user and password remain blank.
   string real_user;
-  Status s = GetLoggedInUser(&real_user);
-  if (!s.ok()) {
-    LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get 
logged-in user name: "
-        << s.ToString() << " before connecting to remote: " << 
remote.ToString();
+  if (auto s = GetLoggedInUser(&real_user); !s.ok()) {
+    LOG(WARNING) << Substitute("$0: unable to get logged-in username "
+                               "before connecting to remote $1 via $2 proxy",
+                               s.ToString(), remote.ToString(), service_name_);
   }
 
   UserCredentials creds;
@@ -77,7 +79,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
   conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
 }
 
-Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+Proxy::Proxy(shared_ptr<Messenger> messenger,
              HostPort hp,
              DnsResolver* dns_resolver,
              string service_name)
@@ -86,12 +88,12 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
       dns_resolver_(dns_resolver),
       messenger_(std::move(messenger)),
       is_started_(false) {
-  CHECK(messenger_ != nullptr);
+  DCHECK(messenger_);
   DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
   DCHECK(hp_.Initialized());
 }
 
-Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const {
+Sockaddr* Proxy::GetSingleSockaddr(vector<Sockaddr>* addrs) const {
   DCHECK(!addrs->empty());
   if (PREDICT_FALSE(addrs->size() > 1)) {
     LOG(WARNING) << Substitute(
@@ -110,8 +112,9 @@ void Proxy::Init(Sockaddr addr) {
   string real_user;
   Status s = GetLoggedInUser(&real_user);
   if (!s.ok()) {
-    LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get 
logged-in user name: "
-        << s.ToString() << " before connecting to host/port: " << 
hp_.ToString();
+    LOG(WARNING) << Substitute(
+      "$0: unable to get logged-in username before connecting to $1 via $2 
proxy",
+      s.ToString(), hp_.ToString(), service_name_);
   }
   vector<Sockaddr> addrs;
   if (!addr.is_initialized()) {
@@ -141,9 +144,14 @@ void Proxy::EnqueueRequest(const string& method,
                            OutboundCall::CallbackBehavior cb_behavior) const {
   ConnectionId connection = conn_id();
   DCHECK(connection.remote().is_initialized());
-  controller->call_.reset(
-      new OutboundCall(connection, {service_name_, method}, 
std::move(req_payload),
-                       cb_behavior, response, controller, callback));
+  controller->call_ = make_shared<OutboundCall>(
+      connection,
+      RemoteMethod{service_name_, method},
+      std::move(req_payload),
+      cb_behavior,
+      response,
+      controller,
+      callback);
   controller->SetMessenger(messenger_.get());
 
   // If this fails to queue, the callback will get called immediately
@@ -151,7 +159,7 @@ void Proxy::EnqueueRequest(const string& method,
   messenger_->QueueOutboundCall(controller->call_);
 }
 
-void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
+void Proxy::RefreshDnsAndEnqueueRequest(const string& method,
                                         unique_ptr<RequestPayload> req_payload,
                                         google::protobuf::Message* response,
                                         RpcController* controller,
@@ -169,7 +177,7 @@ void Proxy::RefreshDnsAndEnqueueRequest(const std::string& 
method,
       // NOTE: we need to keep a reference here because the callback may end up
       // destructing the controller and the outbound call, _while_ the callback
       // is running from within the call!
-      auto shared_call = std::make_shared<OutboundCall>(
+      auto shared_call = make_shared<OutboundCall>(
           conn_id(), RemoteMethod{service_name_, method}, response, 
controller, callback);
       controller->call_ = shared_call;
       controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh 
physical address"));
@@ -194,7 +202,7 @@ void Proxy::AsyncRequest(const string& method,
                          google::protobuf::Message* response,
                          RpcController* controller,
                          const ResponseCallback& callback) {
-  CHECK(!controller->call_) << "Controller should be reset";
+  DCHECK(!controller->call_) << "Controller should be reset";
   base::subtle::NoBarrier_Store(&is_started_, true);
   // TODO(awong): it would be great if we didn't have to heap allocate the
   // payload.
@@ -261,18 +269,18 @@ Status Proxy::SyncRequest(const string& method,
 }
 
 void Proxy::set_user_credentials(UserCredentials user_credentials) {
-  CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
-    << "It is illegal to call set_user_credentials() after request processing 
has started";
+  DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+      << "illegal to call set_user_credentials() after request processing has 
started";
   conn_id_.set_user_credentials(std::move(user_credentials));
 }
 
 void Proxy::set_network_plane(string network_plane) {
-  CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
-    << "It is illegal to call set_network_plane() after request processing has 
started";
+  DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+      << "illegal to call set_network_plane() after request processing has 
started";
   conn_id_.set_network_plane(std::move(network_plane));
 }
 
-std::string Proxy::ToString() const {
+string Proxy::ToString() const {
   return Substitute("$0@$1", service_name_, conn_id_.ToString());
 }
 
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 64ef18861..815119ed8 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -72,6 +72,7 @@ static const int kDefaultLibEvFlags = ev::KQUEUE;
 static const int kDefaultLibEvFlags = ev::AUTO;
 #endif
 
+using std::function;
 using std::string;
 using std::shared_ptr;
 using std::unique_ptr;
@@ -233,7 +234,7 @@ void ReactorThread::PollCompleteCb(struct ev_loop* loop) 
noexcept {
 }
 
 void ReactorThread::Shutdown(Messenger::ShutdownMode mode) {
-  CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
+  DCHECK(reactor_->closing()) << "Should be called after setting closing_ 
flag";
 
   VLOG(1) << name() << ": shutting down Reactor thread.";
   WakeThread();
@@ -241,7 +242,11 @@ void ReactorThread::Shutdown(Messenger::ShutdownMode mode) 
{
   if (mode == Messenger::ShutdownMode::SYNC) {
     // Join() will return a bad status if asked to join on the currently
     // running thread.
-    CHECK_OK(ThreadJoiner(thread_.get()).Join());
+    const auto s = ThreadJoiner(thread_.get()).Join();
+    if (PREDICT_FALSE(!s.ok())) {
+      LOG(DFATAL) << Substitute(
+          "$0: failed to join $1", s.ToString(), thread_->ToString());
+    }
   }
 }
 
@@ -707,7 +712,8 @@ void ReactorThread::DestroyConnection(Connection* conn,
   // Unlink connection from lists.
   if (conn->direction() == Connection::CLIENT) {
     const auto range = 
client_conns_.equal_range(conn->outbound_connection_id());
-    CHECK(range.first != range.second) << "Couldn't find connection " << 
conn->ToString();
+    DCHECK(range.first != range.second)
+        << "couldn't find connection " << conn->ToString();
     // The client_conns_ container is a multi-map.
     for (auto it = range.first; it != range.second;) {
       if (it->second.get() == conn) {
@@ -728,8 +734,7 @@ void ReactorThread::DestroyConnection(Connection* conn,
   }
 }
 
-DelayedTask::DelayedTask(std::function<void(const Status&)> func,
-                         MonoDelta when)
+DelayedTask::DelayedTask(function<void(const Status&)> func, MonoDelta when)
     : func_(std::move(func)),
       when_(when),
       thread_(nullptr) {
@@ -821,7 +826,7 @@ bool Reactor::closing() const {
 // Task to call an arbitrary function within the reactor thread.
 class RunFunctionTask : public ReactorTask {
  public:
-  explicit RunFunctionTask(std::function<Status()> f)
+  explicit RunFunctionTask(function<Status()> f)
       : function_(std::move(f)), latch_(1) {}
 
   void Run(ReactorThread* /*reactor*/) override {
@@ -841,7 +846,7 @@ class RunFunctionTask : public ReactorTask {
   }
 
  private:
-  const std::function<Status()> function_;
+  const function<Status()> function_;
   Status status_;
   CountDownLatch latch_;
 };
@@ -850,7 +855,7 @@ Status Reactor::GetMetrics(ReactorMetrics* metrics) {
   return RunOnReactorThread([&]() { return this->thread_.GetMetrics(metrics); 
});
 }
 
-Status Reactor::RunOnReactorThread(std::function<Status()> f) {
+Status Reactor::RunOnReactorThread(function<Status()> f) {
   RunFunctionTask task(std::move(f));
   ScheduleReactorTask(&task);
   return task.Wait();
@@ -950,7 +955,7 @@ void Reactor::ScheduleReactorTask(ReactorTask* task) {
   bool was_empty;
   {
     std::unique_lock<LockType> l(lock_);
-    if (closing_) {
+    if (PREDICT_FALSE(closing_)) {
       // We guarantee the reactor lock is not taken when calling Abort().
       l.unlock();
       task->Abort(ShutdownError(false));
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index 8a966c01f..8944a2b2d 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -173,12 +173,12 @@ class ReactorThread {
   void RegisterTimeout(ev::timer* watcher);
 
   // This may be called from another thread.
-  const std::string &name() const;
+  const std::string& name() const;
 
   MonoTime cur_time() const;
 
   // This may be called from another thread.
-  Reactor *reactor();
+  Reactor* reactor();
 
   // Return true if this reactor thread is the thread currently
   // running. Should be used in DCHECK assertions.
@@ -196,7 +196,7 @@ class ReactorThread {
 
   // Collect metrics.
   // Must be called from the reactor thread.
-  Status GetMetrics(ReactorMetrics *metrics);
+  Status GetMetrics(ReactorMetrics* metrics);
 
  private:
   friend class AssignOutboundCallTask;
@@ -302,7 +302,7 @@ class ReactorThread {
   // List of current connections coming into the server.
   conn_list_t server_conns_;
 
-  Reactor *reactor_;
+  Reactor* reactor_;
 
   // If a connection has been idle for this much time, it is torn down.
   const MonoDelta connection_keepalive_time_;
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index 1ce2da66d..d92c349b7 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -30,6 +30,7 @@
 #include <limits>
 #include <ostream>
 #include <string>
+#include <utility>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -61,20 +62,62 @@ TAG_FLAG(socket_inject_short_recvs, hidden);
 TAG_FLAG(socket_inject_short_recvs, unsafe);
 
 using std::string;
+using std::numeric_limits;
 using strings::Substitute;
 
 namespace kudu {
 
+namespace {
+
+Status ParseIpAddress(const string& addr_str, Sockaddr* result) {
+  DCHECK(!addr_str.empty());
+  Sockaddr bind_host;
+  const auto s = bind_host.ParseString(addr_str, 0);
+  if (PREDICT_FALSE(!s.ok() || bind_host.port() != 0)) {
+    if (!s.ok()) {
+      return Status::InvalidArgument(
+          Substitute("$0: invalid local IP address", addr_str), s.ToString());
+    }
+    return Status::InvalidArgument(
+        Substitute("$0: unexpected port with IP address", addr_str));
+  }
+
+  if (result) {
+    *result = std::move(bind_host);
+  }
+  return Status::OK();
+}
+
+bool ValidateLocalIpForOutboundSockets(
+    const char* flagname, const string& value) {
+  if (value.empty()) {
+    // The default value should pass the validation.
+    return true;
+  }
+
+  if (auto s = ParseIpAddress(value, nullptr); !s.ok()) {
+    LOG(ERROR) << Substitute("invalid local IP '$0' for --$1: $2",
+                             value, flagname, s.ToString());
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(local_ip_for_outbound_sockets,
+                 &ValidateLocalIpForOutboundSockets);
+
+} // anonymous namespace
+
+
 Socket::Socket()
-  : fd_(-1) {
+    : fd_(-1) {
 }
 
 Socket::Socket(int fd)
-  : fd_(fd) {
+    : fd_(fd) {
 }
 
 Socket::Socket(Socket&& other) noexcept
-  : fd_(other.Release()) {
+    : fd_(other.Release()) {
 }
 
 void Socket::Reset(int fd) {
@@ -259,12 +302,11 @@ Status Socket::SetReusePort(bool flag) {
   #endif
 }
 
-Status Socket::BindAndListen(const Sockaddr &sockaddr,
+Status Socket::BindAndListen(const Sockaddr& sockaddr,
                              int listen_queue_size) {
   RETURN_NOT_OK(SetReuseAddr(true));
   RETURN_NOT_OK(Bind(sockaddr));
-  RETURN_NOT_OK(Listen(listen_queue_size));
-  return Status::OK();
+  return Listen(listen_queue_size);
 }
 
 Status Socket::Listen(int listen_queue_size) {
@@ -275,7 +317,7 @@ Status Socket::Listen(int listen_queue_size) {
   return Status::OK();
 }
 
-Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
+Status Socket::GetSocketAddress(Sockaddr* cur_addr) const {
   struct sockaddr_storage ss;
   socklen_t len = sizeof(ss);
   DCHECK_GE(fd_, 0);
@@ -287,7 +329,7 @@ Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
   return Status::OK();
 }
 
-Status Socket::GetPeerAddress(Sockaddr *cur_addr) const {
+Status Socket::GetPeerAddress(Sockaddr* cur_addr) const {
   struct sockaddr_storage addr;
   socklen_t len = sizeof(addr);
   DCHECK_GE(fd_, 0);
@@ -332,7 +374,7 @@ Status Socket::Bind(const Sockaddr& bind_addr) {
   return Status::OK();
 }
 
-Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
+Status Socket::Accept(Socket* new_conn, Sockaddr* remote, int flags) {
   TRACE_EVENT0("net", "Socket::Accept");
   struct sockaddr_storage addr;
   socklen_t olen = sizeof(addr);
@@ -371,16 +413,11 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote, 
int flags) {
 
 Status Socket::BindForOutgoingConnection() {
   Sockaddr bind_host;
-  Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0);
-  CHECK(s.ok() && bind_host.port() == 0)
-    << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
-    << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString();
-
-  RETURN_NOT_OK(Bind(bind_host));
-  return Status::OK();
+  RETURN_NOT_OK(ParseIpAddress(FLAGS_local_ip_for_outbound_sockets, 
&bind_host));
+  return Bind(bind_host);
 }
 
-Status Socket::Connect(const Sockaddr &remote) {
+Status Socket::Connect(const Sockaddr& remote) {
   TRACE_EVENT1("net", "Socket::Connect",
                "remote", remote.ToString());
   if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
@@ -412,7 +449,7 @@ Status Socket::GetSockError() const {
   return Status::OK();
 }
 
-Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
+Status Socket::Write(const uint8_t* buf, int32_t amt, int32_t* nwritten) {
   if (amt <= 0) {
     return Status::NetworkError(
               StringPrintf("invalid send of %" PRId32 " bytes",
@@ -429,8 +466,9 @@ Status Socket::Write(const uint8_t *buf, int32_t amt, 
int32_t *nwritten) {
   return Status::OK();
 }
 
-Status Socket::Writev(const struct ::iovec *iov, int iov_len,
-                      int64_t *nwritten) {
+Status Socket::Writev(const struct ::iovec* iov,
+                      int iov_len,
+                      int64_t* nwritten) {
   if (PREDICT_FALSE(iov_len <= 0)) {
     return Status::NetworkError(
                 StringPrintf("writev: invalid io vector length of %d",
@@ -441,7 +479,7 @@ Status Socket::Writev(const struct ::iovec *iov, int 
iov_len,
 
   struct msghdr msg;
   memset(&msg, 0, sizeof(struct msghdr));
-  msg.msg_iov = const_cast<iovec *>(iov);
+  msg.msg_iov = const_cast<iovec*>(iov);
   msg.msg_iovlen = iov_len;
   ssize_t res;
   RETRY_ON_EINTR(res, ::sendmsg(fd_, &msg, MSG_NOSIGNAL));
@@ -455,9 +493,9 @@ Status Socket::Writev(const struct ::iovec *iov, int 
iov_len,
 }
 
 // Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
-Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t 
*nwritten,
+Status Socket::BlockingWrite(const uint8_t* buf, size_t buflen, size_t* 
nwritten,
     const MonoTime& deadline) {
-  DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > 
INT32_MAX not supported";
+  DCHECK_LE(buflen, numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not 
supported";
   DCHECK(nwritten);
 
   size_t tot_written = 0;
@@ -492,15 +530,15 @@ Status Socket::BlockingWrite(const uint8_t *buf, size_t 
buflen, size_t *nwritten
     }
   }
 
-  if (tot_written < buflen) {
+  if (PREDICT_FALSE(tot_written < buflen)) {
     return Status::IOError("Wrote zero bytes on a BlockingWrite() call",
         StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen));
   }
   return Status::OK();
 }
 
-Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
-  if (amt <= 0) {
+Status Socket::Recv(uint8_t* buf, int32_t amt, int32_t* nread) {
+  if (PREDICT_FALSE(amt <= 0)) {
     return Status::NetworkError(
           StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL);
   }
@@ -535,8 +573,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t 
*nread) {
 
 // Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
 // One place where we deviate: we consider EOF a failure if < amt bytes are 
read.
-Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const 
MonoTime& deadline) {
-  DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX 
not supported";
+Status Socket::BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const 
MonoTime& deadline) {
+  DCHECK_LE(amt, numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not 
supported";
   DCHECK(nread);
   size_t tot_read = 0;
   while (tot_read < amt) {
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 830a389e9..2ec3b071c 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_UTIL_NET_SOCKET_H
-#define KUDU_UTIL_NET_SOCKET_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -94,18 +93,18 @@ class Socket {
   // 1) SetReuseAddr(true)
   // 2) Bind()
   // 3) Listen()
-  Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size);
+  Status BindAndListen(const Sockaddr& sockaddr, int listen_queue_size);
 
   // Start listening for new connections, with the given backlog size.
   // Requires that the socket has already been bound using Bind().
   Status Listen(int listen_queue_size);
 
   // Call getsockname to get the address of this socket.
-  Status GetSocketAddress(Sockaddr *cur_addr) const;
+  Status GetSocketAddress(Sockaddr* cur_addr) const;
 
   // Call getpeername to get the address of the connected peer.
   // It is virtual so that tests can override.
-  virtual Status GetPeerAddress(Sockaddr *cur_addr) const;
+  virtual Status GetPeerAddress(Sockaddr* cur_addr) const;
 
   // Return true if this socket is determined to be a loopback connection
   // (i.e. the local and remote peer share an IP address).
@@ -119,10 +118,10 @@ class Socket {
   Status Bind(const Sockaddr& bind_addr);
 
   // Call accept(2) to get a new connection.
-  Status Accept(Socket *new_conn, Sockaddr *remote, int flags);
+  Status Accept(Socket* new_conn, Sockaddr* remote, int flags);
 
   // start connecting this socket to a remote address.
-  Status Connect(const Sockaddr &remote);
+  Status Connect(const Sockaddr& remote);
 
   // get the error status using getsockopt(2)
   Status GetSockError() const;
@@ -130,30 +129,30 @@ class Socket {
   // Write up to 'amt' bytes from 'buf' to the socket. The number of bytes
   // actually written will be stored in 'nwritten'. If an error is returned,
   // the value of 'nwritten' is undefined.
-  virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten);
+  virtual Status Write(const uint8_t* buf, int32_t amt, int32_t* nwritten);
 
   // Vectorized Write.
   // If there is an error, that error needs to be resolved before calling 
again.
   // If there was no error, but not all the bytes were written, the unwritten
   // bytes must be retried. See writev(2) for more information.
-  virtual Status Writev(const struct ::iovec *iov, int iov_len, int64_t 
*nwritten);
+  virtual Status Writev(const struct ::iovec* iov, int iov_len, int64_t* 
nwritten);
 
   // Blocking Write call, returns IOError unless full buffer is sent.
   // Underlying Socket expected to be in blocking mode. Fails if any Write() 
sends 0 bytes.
   // Returns OK if buflen bytes were sent, otherwise IOError.
   // Upon return, nwritten will contain the number of bytes actually written.
   // See also writen() from Stevens (2004) or Kerrisk (2010)
-  Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+  Status BlockingWrite(const uint8_t* buf, size_t buflen, size_t* nwritten,
       const MonoTime& deadline);
 
-  virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread);
+  virtual Status Recv(uint8_t* buf, int32_t amt, int32_t* nread);
 
   // Blocking Recv call, returns IOError unless requested amt bytes are read.
   // Underlying Socket expected to be in blocking mode. Fails if any Recv() 
reads 0 bytes.
   // Returns OK if amt bytes were read, otherwise IOError.
   // Upon return, nread will contain the number of bytes actually read.
   // See also readn() from Stevens (2004) or Kerrisk (2010)
-  Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& 
deadline);
+  Status BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const MonoTime& 
deadline);
 
   // Enable TCP keepalive for the underlying socket. A TCP keepalive probe 
will be sent
   // to the remote end after the connection has been idle for 'idle_time_s' 
seconds.
@@ -183,5 +182,3 @@ class Socket {
 };
 
 } // namespace kudu
-
-#endif

Reply via email to