This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new f15a1ca54 [net] DiagnosticSocket wrapper for sock_diag API f15a1ca54 is described below commit f15a1ca5476b256671c2369dd4d9faeccfee6b7c Author: Alexey Serbin <ale...@apache.org> AuthorDate: Fri Jan 12 12:26:25 2024 -0800 [net] DiagnosticSocket wrapper for sock_diag API This patch introduces DiagnosticSocket wrapper for sock_diag() netlink subsystem [1][2]. The primary intention behind DiagnosticSocket is to fetch information on the RX queue size for a listening IPv4 TCP socket. A follow-up patch will use this new facility to populate a new server-level metric: that's to track connection request backlog for a Kudu server's RPC socket (a.k.a. listened socket backlog, pending connections queue, etc.). Since netlink is a Linux-specific API/subsystem, the DiagnosticSocket API is available only on Linux, correspondingly. This patch includes a few unit tests to provide basic coverage for the newly introduced functionality. [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html [2] https://man7.org/linux/man-pages/man7/netlink.7.html Change-Id: I4a4f37ca4b0df8ca543ec383d89766cbf1b1e526 Reviewed-on: http://gerrit.cloudera.org:8080/20892 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Yingchun Lai <laiyingc...@apache.org> --- src/kudu/util/CMakeLists.txt | 5 + src/kudu/util/net/diagnostic_socket-test.cc | 165 ++++++++++++++ src/kudu/util/net/diagnostic_socket.cc | 328 ++++++++++++++++++++++++++++ src/kudu/util/net/diagnostic_socket.h | 121 ++++++++++ 4 files changed, 619 insertions(+) diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index f4d8e5a09..5a4940131 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -267,6 +267,10 @@ set(UTIL_SRCS zlib.cc ) +if(NOT APPLE) + set(UTIL_SRCS ${UTIL_SRCS} net/diagnostic_socket.cc) +endif() + if(NOT NO_TESTS) set(UTIL_SRCS ${UTIL_SRCS} test_graph.cc) endif() @@ -592,6 +596,7 @@ ADD_KUDU_TEST(yamlreader-test) if (NOT APPLE) ADD_KUDU_TEST(minidump-test) + ADD_KUDU_TEST(net/diagnostic_socket-test) endif() ####################################### diff --git a/src/kudu/util/net/diagnostic_socket-test.cc b/src/kudu/util/net/diagnostic_socket-test.cc new file mode 100644 index 000000000..9f192a45f --- /dev/null +++ b/src/kudu/util/net/diagnostic_socket-test.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/net/diagnostic_socket.h" + +#include <netinet/in.h> + +#include <cstddef> +#include <cstdint> +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; + +namespace kudu { + +class DiagnosticSocketTest : public KuduTest { + protected: + Socket listener_; + Sockaddr listen_addr_; + + Status BindAndListen(const string& addr_str, uint16_t port, int listen_backlog = 1) { + Sockaddr address; + RETURN_NOT_OK(address.ParseString(addr_str, port)); + return BindAndListen(address, listen_backlog); + } + + Status BindAndListen(const Sockaddr& address, int listen_backlog) { + RETURN_NOT_OK(listener_.Init(address.family(), 0)); + RETURN_NOT_OK(listener_.BindAndListen(address, listen_backlog)); + return listener_.GetSocketAddress(&listen_addr_); + } +}; + +TEST_F(DiagnosticSocketTest, Basic) { + DiagnosticSocket ds; + // Make sure it's possible to create a netlink socket. + ASSERT_OK(ds.Init()); + // Call Close() on the socket explicitly. + ASSERT_OK(ds.Close()); +} + +TEST_F(DiagnosticSocketTest, ListeningSocket) { + constexpr const char* const kIpAddr = "127.254.254.254"; + constexpr uint16_t kPort = 56789; + constexpr int kListenBacklog = 8; + + ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog)); + + DiagnosticSocket ds; + ASSERT_OK(ds.Init()); + DiagnosticSocket::TcpSocketInfo info; + ASSERT_OK(ds.Query(listener_, &info)); + + // Make sure the result matches the input parameters. + ASSERT_EQ(listen_addr_.ipv4_addr().sin_addr.s_addr, info.src_addr); + ASSERT_EQ(INADDR_ANY, info.dst_addr); + ASSERT_EQ(kPort, ntohs(info.src_port)); + ASSERT_EQ(0, ntohs(info.dst_port)); + ASSERT_EQ(DiagnosticSocket::SS_LISTEN, info.state); + + // TX queue size for a listening socket is the size of the backlog queue. + ASSERT_EQ(kListenBacklog, info.tx_queue_size); + + // Nothing is connecting to the listen port: no pending connections expected. + ASSERT_EQ(0, info.rx_queue_size); +} + +TEST_F(DiagnosticSocketTest, SimplePattern) { + // Open a socket, bind and listen, and then close it. This is just to make + // sure the socket has valid address, but there is no open socket at the + // specified address. + constexpr const char* const kIpAddr = "127.254.254.254"; + constexpr uint16_t kPort = 56789; + constexpr int kListenBacklog = 5; + ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog)); + + const auto& src_addr = listen_addr_; + const auto& dst_addr = Sockaddr::Wildcard(); + const vector<DiagnosticSocket::SocketState> socket_states{ DiagnosticSocket::SS_LISTEN }; + + DiagnosticSocket ds; + ASSERT_OK(ds.Init()); + + // Use a pattern to match only the listened server socket. + { + vector<DiagnosticSocket::TcpSocketInfo> info; + // The query should return success. + ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info)); + ASSERT_EQ(1, info.size()); + const auto& entry = info.front(); + + // Make sure the result matches the input parameters. + ASSERT_EQ(src_addr.ipv4_addr().sin_addr.s_addr, entry.src_addr); + ASSERT_EQ(INADDR_ANY, entry.dst_addr); + ASSERT_EQ(kPort, ntohs(entry.src_port)); + ASSERT_EQ(0, ntohs(entry.dst_port)); + ASSERT_EQ(DiagnosticSocket::SS_LISTEN, entry.state); + + // Verify the expected statistics on the server socket. + ASSERT_EQ(0, entry.rx_queue_size); // no pending connections + ASSERT_EQ(kListenBacklog, entry.tx_queue_size); + } + + // Use a pattern to match any IPv4 TCP socket. + { + const auto& addr_wildcard = Sockaddr::Wildcard(); + const auto& state_wildcard = DiagnosticSocket::SocketStateWildcard(); + vector<DiagnosticSocket::TcpSocketInfo> info; + // The query should return success. + ASSERT_OK(ds.Query(addr_wildcard, addr_wildcard, state_wildcard, &info)); + ASSERT_GE(info.size(), 1); + + // Make sure the server's socket is one of the reported ones. + size_t matched_entries = 0; + for (const auto& entry : info) { + if (src_addr.ipv4_addr().sin_addr.s_addr != entry.src_addr || + INADDR_ANY != entry.dst_addr || + kPort != ntohs(entry.src_port) || + 0 != ntohs(entry.dst_port) || + DiagnosticSocket::SS_LISTEN != entry.state) { + continue; + } + ++matched_entries; + } + ASSERT_EQ(1, matched_entries); + } + + // Close the socket; the socket's address in listen_addr_ still valid. + ASSERT_OK(listener_.Close()); + + { + vector<DiagnosticSocket::TcpSocketInfo> info; + // The query should return success. + ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info)); + // However, the list of matching sockets should be empty since the socket + // that could match the pattern has been just closed. + ASSERT_TRUE(info.empty()); + } +} + +} // namespace kudu diff --git a/src/kudu/util/net/diagnostic_socket.cc b/src/kudu/util/net/diagnostic_socket.cc new file mode 100644 index 000000000..6a2f6a7a6 --- /dev/null +++ b/src/kudu/util/net/diagnostic_socket.cc @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/net/diagnostic_socket.h" + +#include <linux/inet_diag.h> +#include <linux/netlink.h> +#include <linux/sock_diag.h> +#include <linux/types.h> + +#include <netinet/in.h> +#include <sys/socket.h> +#include <unistd.h> + +#include <cerrno> +#include <ostream> +#include <string> +#include <utility> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/errno.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { + +const vector<DiagnosticSocket::SocketState>& DiagnosticSocket::SocketStateWildcard() { + static const vector<DiagnosticSocket::SocketState> kSocketStateWildcard { + SS_ESTABLISHED, + SS_SYN_SENT, + SS_SYN_RECV, + SS_FIN_WAIT1, + SS_FIN_WAIT2, + SS_TIME_WAIT, + SS_CLOSE, + SS_CLOSE_WAIT, + SS_LAST_ACK, + SS_LISTEN, + SS_CLOSING, + }; + return kSocketStateWildcard; +} + +DiagnosticSocket::DiagnosticSocket() + : fd_(-1) { +} + +DiagnosticSocket::~DiagnosticSocket() { + WARN_NOT_OK(Close(), "errors on closing diagnostic socket"); +} + +Status DiagnosticSocket::Init() { + auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG); + if (fd < 0) { + int err = errno; + return Status::RuntimeError("unable to open diagnostic socket", + ErrnoToString(err), err); + } + fd_ = fd; + + return Status::OK(); +} + +Status DiagnosticSocket::Close() { + if (fd_ < 0) { + return Status::OK(); + } + int ret; + RETRY_ON_EINTR(ret, ::close(fd_)); + if (ret < 0) { + int err = errno; + return Status::IOError("close error", ErrnoToString(err), err); + } + fd_ = -1; + return Status::OK(); +} + +Status DiagnosticSocket::Query(const Sockaddr& socket_src_addr, + const Sockaddr& socket_dst_addr, + const vector<SocketState>& socket_states, + vector<TcpSocketInfo>* info) { + DCHECK_GE(fd_, 0) << "requires calling Init() first"; + DCHECK(info); + + uint32_t socket_states_bitmask = 0; + for (auto state : socket_states) { + socket_states_bitmask |= (1U << state); + } + + RETURN_NOT_OK(SendRequest( + socket_src_addr, socket_dst_addr, socket_states_bitmask)); + vector<TcpSocketInfo> result; + RETURN_NOT_OK(ReceiveResponse(&result)); + *info = std::move(result); + return Status::OK(); +} + +Status DiagnosticSocket::Query(const Socket& socket, TcpSocketInfo* info) { + DCHECK_GE(fd_, 0) << "requires calling Init() first"; + DCHECK(info); + + RETURN_NOT_OK(SendRequest(socket)); + vector<TcpSocketInfo> result; + RETURN_NOT_OK(ReceiveResponse(&result)); + if (result.empty()) { + return Status::NotFound("no matching IPv4 TCP socket found"); + } + if (PREDICT_FALSE(result.size() > 1)) { + return Status::InvalidArgument("socket address is ambiguous"); + } + + *info = result.front(); + return Status::OK(); +} + +// Send query about the specified socket. +Status DiagnosticSocket::SendRequest(const Socket& socket) const { + DCHECK_GE(fd_, 0); + + static constexpr const char* const kNonIpErrMsg = + "netlink diagnostics is currently supported only on IPv4 TCP sockets"; + + Sockaddr src_addr; + RETURN_NOT_OK(socket.GetSocketAddress(&src_addr)); + if (PREDICT_FALSE(!src_addr.is_ip())) { + return Status::NotSupported(kNonIpErrMsg); + } + + Sockaddr dst_addr; + auto s = socket.GetPeerAddress(&dst_addr); + if (s.ok()) { + if (PREDICT_FALSE(!dst_addr.is_ip())) { + return Status::NotSupported(kNonIpErrMsg); + } + } else { + if (PREDICT_TRUE(s.IsNetworkError() && s.posix_code() == ENOTCONN)) { + // Assuming it's a listened socket if there isn't a peer at the other side. + dst_addr = Sockaddr::Wildcard(); + } else { + return s; + } + } + + const uint32_t socket_state_bitmask = + dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED); + return SendRequest(src_addr, dst_addr, socket_state_bitmask); +} + +Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr, + const Sockaddr& socket_dst_addr, + uint32_t socket_states_bitmask) const { + DCHECK_GE(fd_, 0); + + const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr; + const auto src_port = socket_src_addr.port(); + const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr; + const auto dst_port = socket_dst_addr.port(); + + constexpr uint32_t kWildcard = static_cast<uint32_t>(-1); + // All values in inet_diag_sockid are in network byte order. + const struct inet_diag_sockid sock_id = { + .idiag_sport = htons(src_port), + .idiag_dport = htons(dst_port), + .idiag_src = { src_ipv4.s_addr, 0, 0, 0, }, + .idiag_dst = { dst_ipv4.s_addr, 0, 0, 0, }, + .idiag_if = kWildcard, + .idiag_cookie = { kWildcard, kWildcard }, + }; + + struct TcpSocketRequest { + struct nlmsghdr nlh; + struct inet_diag_req_v2 idr; + } req = { + .nlh = { + .nlmsg_len = sizeof(req), + .nlmsg_type = SOCK_DIAG_BY_FAMILY, + .nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH, + }, + .idr = { + .sdiag_family = AF_INET, + .sdiag_protocol = IPPROTO_TCP, + .idiag_ext = INET_DIAG_MEMINFO, + .pad = 0, + .idiag_states = socket_states_bitmask, + .id = sock_id, + } + }; + + struct iovec iov = { + .iov_base = &req, + .iov_len = sizeof(req), + }; + struct sockaddr_nl nladdr = { + .nl_family = AF_NETLINK + }; + struct msghdr msg = { + .msg_name = &nladdr, + .msg_namelen = sizeof(nladdr), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + + int rc = -1; + RETRY_ON_EINTR(rc, ::sendmsg(fd_, &msg, 0)); + if (rc < 0) { + int err = errno; + return Status::NetworkError("semdmsg() failed", ErrnoToString(err), err); + } + return Status::OK(); +} + +Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const { + DCHECK_GE(fd_, 0); + + uint8_t buf[8192]; + struct iovec iov = { + .iov_base = buf, + .iov_len = sizeof(buf) + }; + + while (true) { + struct sockaddr_nl nladdr = {}; + struct msghdr msg = { + .msg_name = &nladdr, + .msg_namelen = sizeof(nladdr), + .msg_iov = &iov, + .msg_iovlen = 1 + }; + + ssize_t ret = -1; + RETRY_ON_EINTR(ret, ::recvmsg(fd_, &msg, 0)); + if (PREDICT_FALSE(ret < 0)) { + int err = errno; + return Status::IOError("recvmsg()", ErrnoToString(err), err); + } + if (ret == 0) { + // End of stream. + return Status::OK(); + } + + const struct nlmsghdr* h = reinterpret_cast<const struct nlmsghdr*>(buf); + if (PREDICT_FALSE(!NLMSG_OK(h, ret))) { + return Status::Corruption( + Substitute("unexpected netlink response size $0", ret)); + } + + if (PREDICT_FALSE(nladdr.nl_family != AF_NETLINK)) { + return Status::Corruption(Substitute( + "$0: unexpected address family", static_cast<uint32_t>(nladdr.nl_family))); + } + + for (; NLMSG_OK(h, ret); h = NLMSG_NEXT(h, ret)) { + if (h->nlmsg_type == NLMSG_DONE) { + return Status::OK(); + } + if (PREDICT_FALSE(h->nlmsg_type == NLMSG_ERROR)) { + // Below, the NLMSG_DATA(h) macro is expanded and C-style casts replaced + // with reinterpret_cast<>. + const struct nlmsgerr* errdata = reinterpret_cast<const struct nlmsgerr*>( + reinterpret_cast<const uint8_t*>(h) + NLMSG_LENGTH(0)); + if (PREDICT_FALSE(h->nlmsg_len < NLMSG_LENGTH(sizeof(*errdata)))) { + return Status::Corruption("NLMSG error message is too short"); + } + const int err = -errdata->error; + return Status::RuntimeError("netlink error", ErrnoToString(err), err); + } + + if (PREDICT_FALSE(h->nlmsg_type != SOCK_DIAG_BY_FAMILY)) { + return Status::Corruption(Substitute("$0: unexpected netlink message type", + static_cast<uint32_t>(h->nlmsg_type))); + } + + // Below, the NLMSG_DATA(h) macro is expanded and C-style casts replaced + // with reinterpret_cast<>. + const struct inet_diag_msg* msg_data = reinterpret_cast<const struct inet_diag_msg*>( + reinterpret_cast<const uint8_t*>(h) + NLMSG_LENGTH(0)); + const uint32_t msg_size = h->nlmsg_len; + if (PREDICT_FALSE(msg_size < NLMSG_LENGTH(sizeof(*msg_data)))) { + return Status::Corruption(Substitute( + "$0: netlink response is too short", msg_size)); + } + // Only IPv4 addresses are expected due to the query pattern. + if (PREDICT_FALSE(msg_data->idiag_family != AF_INET)) { + return Status::Corruption(Substitute( + "$0: unexpected address family in netlink response", + static_cast<uint32_t>(msg_data->idiag_family))); + } + + DCHECK_LE(SocketState::SS_ESTABLISHED, msg_data->idiag_state); + DCHECK_GE(SocketState::SS_CLOSING, msg_data->idiag_state); + + TcpSocketInfo info; + info.state = static_cast<SocketState>(msg_data->idiag_state); + info.src_addr = msg_data->id.idiag_src[0]; // IPv4 address, network byte order + info.dst_addr = msg_data->id.idiag_dst[0]; // IPv4 address, network byte order + info.src_port = msg_data->id.idiag_sport; + info.dst_port = msg_data->id.idiag_dport; + info.rx_queue_size = msg_data->idiag_rqueue; + info.tx_queue_size = msg_data->idiag_wqueue; + result->emplace_back(info); + } + } + return Status::OK(); +} + +} // namespace kudu diff --git a/src/kudu/util/net/diagnostic_socket.h b/src/kudu/util/net/diagnostic_socket.h new file mode 100644 index 000000000..bee3ba1b1 --- /dev/null +++ b/src/kudu/util/net/diagnostic_socket.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <vector> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Sockaddr; +class Socket; + +// A wrapper around Linux-specific sock_diag() API [1] based on the +// netlink facility [2] to fetch information on IPv4 TCP sockets. +// +// [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html +// [2] https://man7.org/linux/man-pages/man7/netlink.7.html +class DiagnosticSocket final { + public: + // Enum for the socket state. This is modeled after the corresponding + // TCP_-prefixed enum in /usr/include/netinet/tcp.h with exact value mapping. + // This enum is introduced to decouple the netinet/tcp.h header and the API + // of this class. + enum SocketState { + SS_UNKNOWN = 0, + SS_ESTABLISHED, + SS_SYN_SENT, + SS_SYN_RECV, + SS_FIN_WAIT1, + SS_FIN_WAIT2, + SS_TIME_WAIT, + SS_CLOSE, + SS_CLOSE_WAIT, + SS_LAST_ACK, + SS_LISTEN, + SS_CLOSING, + SS_MAX, + }; + + // Diagnostic information on a TCP IPv4 socket. That's a subset of the + // information available via the netlink data structures. + // + // TODO(aserbin): if using this API more broadly than fetching information on + // a single socket, consider replacing { addr, port } pairs for + // the source and the destination with Sockaddr class fields. + struct TcpSocketInfo { + SocketState state; // current state of the socket + uint32_t src_addr; // IPv4 source address (network byte order) + uint32_t dst_addr; // IPv4 destination address (network byte order) + uint16_t src_port; // source port number (network byte order) + uint16_t dst_port; // destination port number (network byte order) + uint32_t rx_queue_size; // RX queue size + uint32_t tx_queue_size; // TX queue size + }; + + // Return wildcard for all the available socket states. + static const std::vector<SocketState>& SocketStateWildcard(); + + // Construct an object. + DiagnosticSocket(); + + // Close the diagnostic socket. Errors will be logged, but ignored. + ~DiagnosticSocket(); + + // Open the diagnostic socket of the NETLINK_SOCK_DIAG protocol in the + // AF_NETLINK domain, so it's possible to fetch the requested information + // from the kernel using the netlink facility via the API of this class. + Status Init() WARN_UNUSED_RESULT; + + // Close the Socket, checking for errors. + Status Close(); + + // Get diagnostic information on IPv4 TCP sockets of the specified states + // having the specified source and the destination address. Wildcard addresses + // are supported. + Status Query(const Sockaddr& socket_src_addr, + const Sockaddr& socket_dst_addr, + const std::vector<SocketState>& socket_states, + std::vector<TcpSocketInfo>* info); + + // Get diagnostic information on the specified socket. This is a handy + // shortcut to the Query() method above for a single active socket in the + // SS_ESTABLISHED or SS_LISTEN. + Status Query(const Socket& socket, TcpSocketInfo* info); + + private: + // Build and send netlink request, writing it into the diagnostic socket. + Status SendRequest(const Socket& socket) const; + Status SendRequest(const Sockaddr& socket_src_addr, + const Sockaddr& socket_dst_addr, + uint32_t socket_states_bitmask) const; + + // Receive response for a request sent by a method above. + Status ReceiveResponse(std::vector<TcpSocketInfo>* result) const; + + // File descriptor of the diagnostic socket (AF_NETLINK domain). + int fd_; + + DISALLOW_COPY_AND_ASSIGN(DiagnosticSocket); +}; + +} // namespace kudu