This is an automated email from the ASF dual-hosted git repository.

achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2871977a3 [util] move TCP socket stats into the Socket class
2871977a3 is described below

commit 2871977a362949ef9868485b3cf114b7aedccb51
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Fri Dec 8 17:50:22 2023 -0800

    [util] move TCP socket stats into the Socket class
    
    This patch moves the protobuf data structures for TCP socket statistics
    and transport details from src/kudu/rpc/rpc_introspection.proto into
    src/kudu/util/net/socket_info.proto, separating them into a standalone
    file.  The code that used to retrieve statistics on an RPC connection's
    TCP socket has been moved into the Socket class correspondingly.  With
    that, it's now possible to collect statistics from a TCP socket which is
    not yet a transport for any KRPC connection.  This also makes it
    a better design choice if considering the TransportDetailsPB::TlsDetails
    message since the updated code doesn't need to do the down-casting
    anymore.
    
    These updates are required for follow-up patches with the functionality
    of collecting TCP statistics for a Socket instance that a Kudu RPC
    server listens at.
    
    Change-Id: I48ba7915f5153bae288b462b07965cdea7b7957b
    Reviewed-on: http://gerrit.cloudera.org:8080/20772
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
---
 src/kudu/rpc/CMakeLists.txt          |   7 +-
 src/kudu/rpc/connection.cc           | 252 +----------------------------------
 src/kudu/rpc/connection.h            |   6 -
 src/kudu/rpc/rpc_introspection.proto |  54 +-------
 src/kudu/security/CMakeLists.txt     |   7 +-
 src/kudu/security/tls_socket.cc      |  10 +-
 src/kudu/security/tls_socket.h       |   6 +-
 src/kudu/util/CMakeLists.txt         |  15 +++
 src/kudu/util/net/socket.cc          | 218 ++++++++++++++++++++++++++++++
 src/kudu/util/net/socket.h           |   7 +
 src/kudu/util/net/socket_info.proto  |  75 +++++++++++
 11 files changed, 343 insertions(+), 314 deletions(-)

diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 7a967c22a..6b042ad9a 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -32,8 +32,9 @@ PROTOBUF_GENERATE_CPP(
   BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
   PROTO_FILES rpc_introspection.proto)
 set(RPC_INTROSPECTION_PROTO_LIBS
+  protobuf
   rpc_header_proto
-  protobuf)
+  socket_info_proto)
 ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
   SRCS ${RPC_INTROSPECTION_PROTO_SRCS}
   DEPS ${RPC_INTROSPECTION_PROTO_LIBS}
@@ -83,7 +84,9 @@ set(KRPC_LIBS
   libev
   rpc_header_proto
   rpc_introspection_proto
-  security)
+  security
+  socket_info_proto
+)
 
 ADD_EXPORTABLE_LIBRARY(krpc
   SRCS ${KRPC_SRCS}
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index c6727b805..dfb8ec79e 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -17,16 +17,8 @@
 
 #include "kudu/rpc/connection.h"
 
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <sys/socket.h>
-#ifdef __linux__
-#include <sys/ioctl.h>
-#endif
-
 #include <algorithm>
 #include <cerrno>
-#include <cstddef>
 #include <iostream>
 #include <memory>
 #include <set>
@@ -49,15 +41,12 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/transfer.h"
-#include "kudu/security/tls_socket.h"
-#include "kudu/util/errno.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
-using kudu::security::TlsSocket;
 using std::includes;
 using std::set;
 using std::shared_ptr;
@@ -70,132 +59,6 @@ namespace rpc {
 
 typedef OutboundCall::Phase Phase;
 
-namespace {
-
-// tcp_info struct duplicated from linux/tcp.h.
-//
-// This allows us to decouple the compile-time Linux headers from the
-// runtime Linux kernel. The compile-time headers (and kernel) might be
-// older than the runtime kernel, in which case an ifdef-based approach
-// wouldn't allow us to get all of the info available.
-//
-// NOTE: this struct has been annotated with some local notes about the
-// contents of each field.
-struct tcp_info {
-  // Various state-tracking information.
-  // ------------------------------------------------------------
-  uint8_t    tcpi_state;
-  uint8_t    tcpi_ca_state;
-  uint8_t    tcpi_retransmits;
-  uint8_t    tcpi_probes;
-  uint8_t    tcpi_backoff;
-  uint8_t    tcpi_options;
-  uint8_t    tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
-  uint8_t    tcpi_delivery_rate_app_limited:1;
-
-  // Configurations.
-  // ------------------------------------------------------------
-  uint32_t   tcpi_rto;
-  uint32_t   tcpi_ato;
-  uint32_t   tcpi_snd_mss;
-  uint32_t   tcpi_rcv_mss;
-
-  // Counts of packets in various states in the outbound queue.
-  // At first glance one might think these are monotonic counters, but
-  // in fact they are instantaneous counts of queued packets and thus
-  // not very useful for our purposes.
-  // ------------------------------------------------------------
-  // Number of packets outstanding that haven't been acked.
-  uint32_t   tcpi_unacked;
-
-  // Number of packets outstanding that have been selective-acked.
-  uint32_t   tcpi_sacked;
-
-  // Number of packets outstanding that have been deemed lost (a SACK arrived
-  // for a later packet)
-  uint32_t   tcpi_lost;
-
-  // Number of packets in the queue that have been retransmitted.
-  uint32_t   tcpi_retrans;
-
-  // The number of packets towards the highest SACKed sequence number
-  // (some measure of reording, removed in later Linux versions by
-  // 737ff314563ca27f044f9a3a041e9d42491ef7ce)
-  uint32_t   tcpi_fackets;
-
-  // Times when various events occurred.
-  // ------------------------------------------------------------
-  uint32_t   tcpi_last_data_sent;
-  uint32_t   tcpi_last_ack_sent;     /* Not remembered, sorry. */
-  uint32_t   tcpi_last_data_recv;
-  uint32_t   tcpi_last_ack_recv;
-
-  // Path MTU.
-  uint32_t   tcpi_pmtu;
-
-  // Receiver slow start threshold.
-  uint32_t   tcpi_rcv_ssthresh;
-
-  // Smoothed RTT estimate and variance based on the time between sending data 
and receiving
-  // corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details.
-  uint32_t   tcpi_rtt;
-  uint32_t   tcpi_rttvar;
-
-  // Slow start threshold.
-  uint32_t   tcpi_snd_ssthresh;
-  // Sender congestion window (in number of MSS-sized packets)
-  uint32_t   tcpi_snd_cwnd;
-  // Advertised MSS.
-  uint32_t   tcpi_advmss;
-  // Amount of packet reordering allowed.
-  uint32_t   tcpi_reordering;
-
-  // Receiver-side RTT estimate per the Dynamic Right Sizing algorithm:
-  //
-  // "A system that is only transmitting acknowledgements can still estimate 
the round-trip
-  // time by observing the time between when a byte is first acknowledged and 
the receipt of
-  // data that is at least one window beyond the sequence number that was 
acknowledged. If the
-  // sender is being throttled by the network, this estimate will be valid. 
However, if the
-  // sending application did not have any data to send, the measured time 
could be much larger
-  // than the actual round-trip time. Thus this measurement acts only as an 
upper-bound on the
-  // round-trip time and should be be used only when it is the only source of 
round-trip time
-  // information."
-  uint32_t   tcpi_rcv_rtt;
-  uint32_t   tcpi_rcv_space;
-
-  // Total number of retransmitted packets.
-  uint32_t   tcpi_total_retrans;
-
-  // Pacing-related metrics.
-  uint64_t   tcpi_pacing_rate;
-  uint64_t   tcpi_max_pacing_rate;
-
-  // Total bytes ACKed by remote peer.
-  uint64_t   tcpi_bytes_acked;    /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
-  // Total bytes received (for which ACKs have been sent out).
-  uint64_t   tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived 
*/
-  // Segments sent and received.
-  uint32_t   tcpi_segs_out;       /* RFC4898 tcpEStatsPerfSegsOut */
-  uint32_t   tcpi_segs_in;        /* RFC4898 tcpEStatsPerfSegsIn */
-
-  // The following metrics are quite new and not in el7.
-  // ------------------------------------------------------------
-  uint32_t   tcpi_notsent_bytes;
-  uint32_t   tcpi_min_rtt;
-  uint32_t   tcpi_data_segs_in;      /* RFC4898 tcpEStatsDataSegsIn */
-  uint32_t   tcpi_data_segs_out;     /* RFC4898 tcpEStatsDataSegsOut */
-
-  // Calculated rate at which data was delivered.
-  uint64_t   tcpi_delivery_rate;
-
-  // Timers for various states.
-  uint64_t   tcpi_busy_time;      /* Time (usec) busy sending data */
-  uint64_t   tcpi_rwnd_limited;   /* Time (usec) limited by receive window */
-  uint64_t   tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
-};
-
-} // anonymous namespace
-
 ///
 /// Connection
 ///
@@ -939,129 +802,22 @@ Status Connection::DumpPB(const 
DumpConnectionsRequestPB& req,
                                 static_cast<uint16_t>(direction_));
       break;
   }
-#ifdef __linux__
+#if defined(__linux__)
   if (negotiation_complete_ && remote_.is_ip()) {
     // TODO(todd): it's a little strange to not set socket level stats during
     // negotiation, but we don't have access to the socket here until 
negotiation
     // is complete.
-    WARN_NOT_OK(GetSocketStatsPB(resp->mutable_socket_stats()),
+    WARN_NOT_OK(socket_->GetStats(resp->mutable_socket_stats()),
                 "could not fill in TCP info for RPC connection");
   }
-#endif // __linux__
+#endif // #if defined(__linux__) ...
 
   if (negotiation_complete_ && remote_.is_ip()) {
-    WARN_NOT_OK(GetTransportDetailsPB(resp->mutable_transport_details()),
+    
WARN_NOT_OK(socket_->GetTransportDetails(resp->mutable_transport_details()),
                 "could not fill in transport info for RPC connection");
   }
   return Status::OK();
 }
 
-#ifdef __linux__
-Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const {
-  DCHECK(reactor_thread_->IsCurrentThread());
-  int fd = socket_->GetFd();
-  DCHECK_GE(fd, 0);
-
-  // Fetch TCP_INFO statistics from the kernel.
-  tcp_info ti = {};
-  socklen_t len = sizeof(ti);
-  int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len);
-  if (rc == 0) {
-#   define HAS_FIELD(field_name) \
-        (len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name))
-    if (!HAS_FIELD(tcpi_total_retrans)) {
-      // All the fields up through tcpi_total_retrans were present since very 
old
-      // kernel versions, beyond our minimal supported. So, we can just bail 
if we
-      // don't get sufficient data back.
-      return Status::NotSupported("bad length returned for TCP_INFO");
-    }
-
-    pb->set_rtt(ti.tcpi_rtt);
-    pb->set_rttvar(ti.tcpi_rttvar);
-    pb->set_snd_cwnd(ti.tcpi_snd_cwnd);
-    pb->set_total_retrans(ti.tcpi_total_retrans);
-
-    // The following fields were added later in kernel development history.
-    // In RHEL6 they were backported starting in 6.8. Even though they were
-    // backported all together as a group, we'll just be safe and check for
-    // each individually.
-    if (HAS_FIELD(tcpi_pacing_rate)) {
-      pb->set_pacing_rate(ti.tcpi_pacing_rate);
-    }
-    if (HAS_FIELD(tcpi_max_pacing_rate)) {
-      pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate);
-    }
-    if (HAS_FIELD(tcpi_bytes_acked)) {
-      pb->set_bytes_acked(ti.tcpi_bytes_acked);
-    }
-    if (HAS_FIELD(tcpi_bytes_received)) {
-      pb->set_bytes_received(ti.tcpi_bytes_received);
-    }
-    if (HAS_FIELD(tcpi_segs_out)) {
-      pb->set_segs_out(ti.tcpi_segs_out);
-    }
-    if (HAS_FIELD(tcpi_segs_in)) {
-      pb->set_segs_in(ti.tcpi_segs_in);
-    }
-
-    // Calculate sender bandwidth based on the same logic used by the 'ss' 
utility.
-    if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) {
-      // Units:
-      //  rtt = usec
-      //  cwnd = number of MSS-size packets
-      //  mss = bytes / packet
-      //
-      // Dimensional analysis:
-      //   packets * bytes/packet * usecs/sec / usec -> bytes/sec
-      static constexpr int kUsecsPerSec = 1000000;
-      pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) *
-                                 ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt);
-    }
-  }
-
-  // Fetch the queue sizes.
-  int queue_len = 0;
-  rc = ioctl(fd, TIOCOUTQ, &queue_len);
-  if (rc == 0) {
-    pb->set_send_queue_bytes(queue_len);
-  }
-  rc = ioctl(fd, FIONREAD, &queue_len);
-  if (rc == 0) {
-    pb->set_receive_queue_bytes(queue_len);
-  }
-  return Status::OK();
-}
-#endif // __linux__
-
-Status Connection::GetTransportDetailsPB(TransportDetailsPB* pb) const {
-  DCHECK(reactor_thread_->IsCurrentThread());
-  DCHECK(pb);
-
-  // As for the dynamic_cast below: this is not very elegant or performant 
code,
-  // but introducing a generic virtual method with vague semantics into the 
base
-  // Socket class doesn't look like a good choice either. Also, the
-  // GetTransportDetailsPB() method isn't supposed to be a part of any hot 
path.
-  const TlsSocket* tls_socket = dynamic_cast<TlsSocket*>(socket_.get());
-  if (tls_socket) {
-    auto* tls = pb->mutable_tls();
-    tls->set_protocol(tls_socket->GetProtocolName());
-    tls->set_cipher_suite(tls_socket->GetCipherDescription());
-  }
-
-  const int fd = socket_->GetFd();
-  DCHECK_GE(fd, 0);
-  int32_t max_seg_size = 0;
-  socklen_t optlen = sizeof(max_seg_size);
-  int ret = ::getsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen);
-  if (PREDICT_FALSE(ret)) {
-    int err = errno;
-    return Status::NetworkError(
-        "getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err);
-  }
-  pb->mutable_tcp()->set_max_segment_size(max_seg_size);
-
-  return Status::OK();
-}
-
 } // namespace rpc
 } // namespace kudu
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index eab48f54e..27026efd5 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -52,8 +52,6 @@ class OutboundCall;
 class ReactorThread;
 class RpcConnectionPB;
 class RpczStore;
-class SocketStatsPB;
-class TransportDetailsPB;
 
 enum class CredentialsPolicy;
 
@@ -318,10 +316,6 @@ class Connection : public RefCountedThreadSafe<Connection> 
{
   // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
   void MaybeInjectCancellation(const std::shared_ptr<OutboundCall>& call);
 
-  Status GetSocketStatsPB(SocketStatsPB* pb) const;
-
-  Status GetTransportDetailsPB(TransportDetailsPB* pb) const;
-
   // The reactor thread that created this connection.
   ReactorThread* const reactor_thread_;
 
diff --git a/src/kudu/rpc/rpc_introspection.proto 
b/src/kudu/rpc/rpc_introspection.proto
index 9a81372ef..f747eee2d 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -24,6 +24,7 @@ package kudu.rpc;
 option java_package = "org.apache.kudu";
 
 import "kudu/rpc/rpc_header.proto";
+import "kudu/util/net/socket_info.proto";
 
 message RpcCallInProgressPB {
   required RequestHeader header = 1;
@@ -50,59 +51,6 @@ message RpcCallInProgressPB {
   optional State state = 4;
 }
 
-// The SocketStatsPB message is used to report on socket-level information
-// for RPC-related TCP connections (Linux-only). Essentially, the message
-// contains some metrics and counters from Linux-specific 'tcp_info' structure
-// defined in /usr/include/linux/tcp.h. For more information on the TCP on
-// Linux, see http://man7.org/linux/man-pages/man7/tcp.7.html
-message SocketStatsPB {
-  optional uint32 rtt = 1;
-  optional uint32 rttvar = 2;
-  optional uint32 snd_cwnd = 3;
-  optional uint32 total_retrans = 4;
-
-  optional uint64 pacing_rate = 5;
-  optional uint64 max_pacing_rate = 6;
-
-  optional uint64 bytes_acked = 7;
-  optional uint64 bytes_received = 8;
-  optional uint32 segs_out = 9;
-  optional uint32 segs_in = 10;
-
-  optional uint64 send_queue_bytes = 11;
-  optional uint64 receive_queue_bytes = 12;
-
-  // Calculated sender throughput.
-  optional int64 send_bytes_per_sec = 13;
-}
-
-// Transport-related information for an RPC connection.
-message TransportDetailsPB {
-
-  // TCP-specific details.
-  message TcpDetails {
-    // Maximum segment size for the packets: this directly maps into the
-    // TCP_MAXSEG socket option.
-    optional int32 max_segment_size = 1;
-  }
-
-  // TLS-specific details.
-  //
-  // NOTE: TLS/SSL doesn't map nicely into a single layer of the TCP/IP or
-  // the OSI model, but intuitively that's something related to the transport.
-  message TlsDetails {
-    // The name of the TLS protocol negotiated to protect the connection
-    // (e.g. TLSv1.3).
-    optional string protocol = 1;
-
-    // Description of the TLS cipher suite used.
-    optional string cipher_suite = 2;
-  }
-
-  optional TcpDetails tcp = 1;
-  optional TlsDetails tls = 2;
-}
-
 // Debugging information about a currently-open RPC connection.
 message RpcConnectionPB {
   enum StateType {
diff --git a/src/kudu/security/CMakeLists.txt b/src/kudu/security/CMakeLists.txt
index 266aa352f..4131eada1 100644
--- a/src/kudu/security/CMakeLists.txt
+++ b/src/kudu/security/CMakeLists.txt
@@ -79,17 +79,18 @@ set(SECURITY_SRCS
   token_verifier.cc
   token_signer.cc
   token_signing_key.cc
-  )
+)
 
 set(SECURITY_LIBS
   gutil
   kudu_util
   token_proto
-
   gssapi_krb5
   krb5
   openssl_crypto
-  openssl_ssl)
+  openssl_ssl
+  socket_info_proto
+)
 
 ADD_EXPORTABLE_LIBRARY(security
   SRCS ${SECURITY_SRCS}
diff --git a/src/kudu/security/tls_socket.cc b/src/kudu/security/tls_socket.cc
index 088c8f7e7..a17ffdfc1 100644
--- a/src/kudu/security/tls_socket.cc
+++ b/src/kudu/security/tls_socket.cc
@@ -23,7 +23,6 @@
 
 #include <cerrno>
 #include <cstddef>
-#include <functional>
 #include <string>
 #include <utility>
 
@@ -34,6 +33,7 @@
 #include "kudu/util/errno.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/net/socket_info.pb.h"
 #include "kudu/util/openssl_util.h"
 
 using std::string;
@@ -257,6 +257,14 @@ Status TlsSocket::Close() {
   return ssl_shutdown;
 }
 
+Status TlsSocket::GetTransportDetails(TransportDetailsPB* pb) const {
+  DCHECK(pb);
+  auto* tls = pb->mutable_tls();
+  tls->set_protocol(GetProtocolName());
+  tls->set_cipher_suite(GetCipherDescription());
+  return Socket::GetTransportDetails(pb);
+}
+
 string TlsSocket::GetProtocolName() const {
   return ::kudu::security::GetProtocolName(ssl_.get());
 }
diff --git a/src/kudu/security/tls_socket.h b/src/kudu/security/tls_socket.h
index 539e1721b..f21a57d6d 100644
--- a/src/kudu/security/tls_socket.h
+++ b/src/kudu/security/tls_socket.h
@@ -20,7 +20,6 @@
 #include <openssl/ssl.h>
 
 #include <cstdint>
-#include <memory>
 #include <string>
 
 #include "kudu/gutil/port.h"
@@ -34,6 +33,9 @@ struct iovec;
 typedef struct ssl_st SSL;
 
 namespace kudu {
+
+class TransportDetailsPB;
+
 namespace security {
 
 class TlsSocket : public Socket {
@@ -51,6 +53,8 @@ class TlsSocket : public Socket {
 
   Status Close() override WARN_UNUSED_RESULT;
 
+  Status GetTransportDetails(TransportDetailsPB* pb) const override;
+
   // Get the name of the negotiated TLS protocol for the connection.
   std::string GetProtocolName() const;
 
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index b6a64940e..f4d8e5a09 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -113,6 +113,20 @@ ADD_EXPORTABLE_LIBRARY(pb_util_proto
   DEPS protobuf
   NONLINK_DEPS ${PB_UTIL_PROTO_TGTS})
 
+#######################################
+# socket_info_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+  SOCKET_INFO_PROTO_SRCS SOCKET_INFO_PROTO_HDRS SOCKET_INFO_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES net/socket_info.proto)
+ADD_EXPORTABLE_LIBRARY(socket_info_proto
+  SRCS ${SOCKET_INFO_PROTO_SRCS}
+  DEPS protobuf
+  NONLINK_DEPS ${SOCKET_INFO_PROTO_TGTS})
+
 #######################################
 # version_info_proto
 #######################################
@@ -301,6 +315,7 @@ set(UTIL_LIBS
   protobuf
   openssl_crypto
   openssl_ssl
+  socket_info_proto
   version_info_proto
   yaml
   zlib)
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index d92c349b7..75ff53544 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -21,6 +21,7 @@
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>
 #include <sys/time.h>
 #include <unistd.h>
 
@@ -45,6 +46,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket_info.pb.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
@@ -108,6 +110,132 @@ DEFINE_validator(local_ip_for_outbound_sockets,
 } // anonymous namespace
 
 
+namespace {
+
+// tcp_info struct duplicated from linux/tcp.h.
+//
+// This allows us to decouple the compile-time Linux headers from the
+// runtime Linux kernel. The compile-time headers (and kernel) might be
+// older than the runtime kernel, in which case an ifdef-based approach
+// wouldn't allow us to get all of the info available.
+//
+// NOTE: this struct has been annotated with some local notes about the
+// contents of each field.
+struct tcp_info { // NOLINT(readability-identifier-naming)
+  // Various state-tracking information.
+  // ------------------------------------------------------------
+  uint8_t    tcpi_state;
+  uint8_t    tcpi_ca_state;
+  uint8_t    tcpi_retransmits;
+  uint8_t    tcpi_probes;
+  uint8_t    tcpi_backoff;
+  uint8_t    tcpi_options;
+  uint8_t    tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
+  uint8_t    tcpi_delivery_rate_app_limited:1;
+
+  // Configurations.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_rto;
+  uint32_t   tcpi_ato;
+  uint32_t   tcpi_snd_mss;
+  uint32_t   tcpi_rcv_mss;
+
+  // Counts of packets in various states in the outbound queue.
+  // At first glance one might think these are monotonic counters, but
+  // in fact they are instantaneous counts of queued packets and thus
+  // not very useful for our purposes.
+  // ------------------------------------------------------------
+  // Number of packets outstanding that haven't been acked.
+  uint32_t   tcpi_unacked;
+
+  // Number of packets outstanding that have been selective-acked.
+  uint32_t   tcpi_sacked;
+
+  // Number of packets outstanding that have been deemed lost (a SACK arrived
+  // for a later packet)
+  uint32_t   tcpi_lost;
+
+  // Number of packets in the queue that have been retransmitted.
+  uint32_t   tcpi_retrans;
+
+  // The number of packets towards the highest SACKed sequence number
+  // (some measure of reording, removed in later Linux versions by
+  // 737ff314563ca27f044f9a3a041e9d42491ef7ce)
+  uint32_t   tcpi_fackets;
+
+  // Times when various events occurred.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_last_data_sent;
+  uint32_t   tcpi_last_ack_sent;     /* Not remembered, sorry. */
+  uint32_t   tcpi_last_data_recv;
+  uint32_t   tcpi_last_ack_recv;
+
+  // Path MTU.
+  uint32_t   tcpi_pmtu;
+
+  // Receiver slow start threshold.
+  uint32_t   tcpi_rcv_ssthresh;
+
+  // Smoothed RTT estimate and variance based on the time between sending data 
and receiving
+  // corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details.
+  uint32_t   tcpi_rtt;
+  uint32_t   tcpi_rttvar;
+
+  // Slow start threshold.
+  uint32_t   tcpi_snd_ssthresh;
+  // Sender congestion window (in number of MSS-sized packets)
+  uint32_t   tcpi_snd_cwnd;
+  // Advertised MSS.
+  uint32_t   tcpi_advmss;
+  // Amount of packet reordering allowed.
+  uint32_t   tcpi_reordering;
+
+  // Receiver-side RTT estimate per the Dynamic Right Sizing algorithm:
+  //
+  // "A system that is only transmitting acknowledgements can still estimate 
the round-trip
+  // time by observing the time between when a byte is first acknowledged and 
the receipt of
+  // data that is at least one window beyond the sequence number that was 
acknowledged. If the
+  // sender is being throttled by the network, this estimate will be valid. 
However, if the
+  // sending application did not have any data to send, the measured time 
could be much larger
+  // than the actual round-trip time. Thus this measurement acts only as an 
upper-bound on the
+  // round-trip time and should be be used only when it is the only source of 
round-trip time
+  // information."
+  uint32_t   tcpi_rcv_rtt;
+  uint32_t   tcpi_rcv_space;
+
+  // Total number of retransmitted packets.
+  uint32_t   tcpi_total_retrans;
+
+  // Pacing-related metrics.
+  uint64_t   tcpi_pacing_rate;
+  uint64_t   tcpi_max_pacing_rate;
+
+  // Total bytes ACKed by remote peer.
+  uint64_t   tcpi_bytes_acked;    /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
+  // Total bytes received (for which ACKs have been sent out).
+  uint64_t   tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived 
*/
+  // Segments sent and received.
+  uint32_t   tcpi_segs_out;       /* RFC4898 tcpEStatsPerfSegsOut */
+  uint32_t   tcpi_segs_in;        /* RFC4898 tcpEStatsPerfSegsIn */
+
+  // The following metrics are quite new and not in el7.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_notsent_bytes;
+  uint32_t   tcpi_min_rtt;
+  uint32_t   tcpi_data_segs_in;      /* RFC4898 tcpEStatsDataSegsIn */
+  uint32_t   tcpi_data_segs_out;     /* RFC4898 tcpEStatsDataSegsOut */
+
+  // Calculated rate at which data was delivered.
+  uint64_t   tcpi_delivery_rate;
+
+  // Timers for various states.
+  uint64_t   tcpi_busy_time;      /* Time (usec) busy sending data */
+  uint64_t   tcpi_rwnd_limited;   /* Time (usec) limited by receive window */
+  uint64_t   tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
+};
+
+} // anonymous namespace
+
 Socket::Socket()
     : fd_(-1) {
 }
@@ -615,6 +743,96 @@ Status Socket::BlockingRecv(uint8_t* buf, size_t amt, 
size_t* nread, const MonoT
   return Status::OK();
 }
 
+#if defined(__linux__)
+Status Socket::GetStats(SocketStatsPB* pb) const {
+  DCHECK_GE(fd_, 0);
+
+#define HAS_FIELD(field_name) \
+        (len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name))
+
+  // Fetch TCP_INFO statistics from the kernel.
+  tcp_info ti = {};
+  socklen_t len = sizeof(ti);
+  if (int rc = getsockopt(fd_, IPPROTO_TCP, TCP_INFO, &ti, &len); rc == 0) {
+    if (!HAS_FIELD(tcpi_total_retrans)) {
+      // All the fields up through tcpi_total_retrans were present since very 
old
+      // kernel versions, beyond our minimal supported. So, we can just bail 
if we
+      // don't get sufficient data back.
+      return Status::NotSupported("bad length returned for TCP_INFO");
+    }
+
+    pb->set_rtt(ti.tcpi_rtt);
+    pb->set_rttvar(ti.tcpi_rttvar);
+    pb->set_snd_cwnd(ti.tcpi_snd_cwnd);
+    pb->set_total_retrans(ti.tcpi_total_retrans);
+
+    // The following fields were added later in kernel development history.
+    // In RHEL6 they were backported starting in 6.8. Even though they were
+    // backported all together as a group, we'll just be safe and check for
+    // each individually.
+    if (HAS_FIELD(tcpi_pacing_rate)) {
+      pb->set_pacing_rate(ti.tcpi_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_max_pacing_rate)) {
+      pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_bytes_acked)) {
+      pb->set_bytes_acked(ti.tcpi_bytes_acked);
+    }
+    if (HAS_FIELD(tcpi_bytes_received)) {
+      pb->set_bytes_received(ti.tcpi_bytes_received);
+    }
+    if (HAS_FIELD(tcpi_segs_out)) {
+      pb->set_segs_out(ti.tcpi_segs_out);
+    }
+    if (HAS_FIELD(tcpi_segs_in)) {
+      pb->set_segs_in(ti.tcpi_segs_in);
+    }
+
+    // Calculate sender bandwidth based on the same logic used by the 'ss' 
utility.
+    if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) {
+      // Units:
+      //  rtt = usec
+      //  cwnd = number of MSS-size packets
+      //  mss = bytes / packet
+      //
+      // Dimensional analysis:
+      //   packets * bytes/packet * usecs/sec / usec -> bytes/sec
+      static constexpr int kUsecsPerSec = 1000000;
+      pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) *
+                                 ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt);
+    }
+  }
+#undef HAS_FIELD
+
+  // Fetch the queue sizes.
+  int queue_len = 0;
+  if (int rc = ioctl(fd_, TIOCOUTQ, &queue_len); rc == 0) {
+    pb->set_send_queue_bytes(queue_len);
+  }
+  if (int rc = ioctl(fd_, FIONREAD, &queue_len); rc == 0) {
+    pb->set_receive_queue_bytes(queue_len);
+  }
+  return Status::OK();
+}
+#endif // #if defined(__linux__) ...
+
+Status Socket::GetTransportDetails(TransportDetailsPB* pb) const {
+  DCHECK(pb);
+  DCHECK_GE(fd_, 0);
+  int32_t max_seg_size = 0;
+  socklen_t optlen = sizeof(max_seg_size);
+  int ret = ::getsockopt(fd_, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen);
+  if (ret) {
+    int err = errno;
+    return Status::NetworkError(
+        "getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err);
+  }
+  pb->mutable_tcp()->set_max_segment_size(max_seg_size);
+
+  return Status::OK();
+}
+
 Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta& 
timeout) {
   if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
     return Status::InvalidArgument("Timeout specified as negative to 
SetTimeout",
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 2ec3b071c..a301ebafd 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -26,6 +26,9 @@ struct iovec;
 
 namespace kudu {
 
+class SocketStatsPB;
+class TransportDetailsPB;
+
 class MonoDelta;
 class MonoTime;
 class Sockaddr;
@@ -161,6 +164,10 @@ class Socket {
   // keepalive probes.
   Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries);
 
+  Status GetStats(SocketStatsPB* pb) const;
+
+  virtual Status GetTransportDetails(TransportDetailsPB* pb) const;
+
  private:
   // Called internally from SetSend/RecvTimeout().
   Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);
diff --git a/src/kudu/util/net/socket_info.proto 
b/src/kudu/util/net/socket_info.proto
new file mode 100644
index 000000000..c87d5db47
--- /dev/null
+++ b/src/kudu/util/net/socket_info.proto
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+// The SocketStatsPB message is used to report on socket-level information
+// for TCP connections (Linux-only). Essentially, the message
+// contains some metrics and counters from Linux-specific 'tcp_info' structure
+// defined in /usr/include/linux/tcp.h. For more information on the TCP on
+// Linux, see http://man7.org/linux/man-pages/man7/tcp.7.html
+message SocketStatsPB {
+  optional uint32 rtt = 1;
+  optional uint32 rttvar = 2;
+  optional uint32 snd_cwnd = 3;
+  optional uint32 total_retrans = 4;
+
+  optional uint64 pacing_rate = 5;
+  optional uint64 max_pacing_rate = 6;
+
+  optional uint64 bytes_acked = 7;
+  optional uint64 bytes_received = 8;
+  optional uint32 segs_out = 9;
+  optional uint32 segs_in = 10;
+
+  optional uint64 send_queue_bytes = 11;
+  optional uint64 receive_queue_bytes = 12;
+
+  // Calculated sender throughput.
+  optional int64 send_bytes_per_sec = 13;
+}
+
+// Transport-related information for a socket.
+message TransportDetailsPB {
+
+  // TCP-specific details.
+  message TcpDetails {
+    // Maximum segment size for the packets: this directly maps into the
+    // TCP_MAXSEG socket option.
+    optional int32 max_segment_size = 1;
+  }
+
+  // TLS-specific details.
+  //
+  // NOTE: TLS/SSL doesn't map nicely into a single layer of the TCP/IP or
+  // the OSI model, but intuitively that's something related to the transport.
+  message TlsDetails {
+    // The name of the TLS protocol negotiated to protect the connection
+    // (e.g. TLSv1.3).
+    optional string protocol = 1;
+
+    // Description of the TLS cipher suite used.
+    optional string cipher_suite = 2;
+  }
+
+  optional TcpDetails tcp = 1;
+  optional TlsDetails tls = 2;
+}

Reply via email to