This is an automated email from the ASF dual-hosted git repository. achennaka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 19bade3d6 [rpc] modernize code a bit 19bade3d6 is described below commit 19bade3d6ed8422591d32f48df64d3b4cdf5dc55 Author: Alexey Serbin <ale...@apache.org> AuthorDate: Fri Dec 15 19:57:38 2023 -0800 [rpc] modernize code a bit Since I'm updating the code in src/kudurpc/connection.{h,cc} and src/kudu/util/net/socket.{h,cc} in follow-up changelists, I went ahead and updated the code to be style-compliant, modernized it, and did other unsorted improvements before introducing the new functionality. Change-Id: I2634591426c3e107e4d11b661ff62e5cde8a7570 Reviewed-on: http://gerrit.cloudera.org:8080/20815 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Mahesh Reddy <mre...@cloudera.com> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> --- src/kudu/rpc/connection.cc | 288 ++++++++++++++++++++++++-------------------- src/kudu/rpc/connection.h | 27 ++--- src/kudu/rpc/messenger.cc | 18 ++- src/kudu/rpc/messenger.h | 6 +- src/kudu/rpc/proxy.cc | 54 +++++---- src/kudu/rpc/reactor.cc | 23 ++-- src/kudu/rpc/reactor.h | 8 +- src/kudu/util/net/socket.cc | 94 ++++++++++----- src/kudu/util/net/socket.h | 25 ++-- 9 files changed, 303 insertions(+), 240 deletions(-) diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc index ddf8dd3e5..c6727b805 100644 --- a/src/kudu/rpc/connection.cc +++ b/src/kudu/rpc/connection.cc @@ -19,10 +19,14 @@ #include <netinet/in.h> #include <netinet/tcp.h> -#include <string.h> +#include <sys/socket.h> +#ifdef __linux__ +#include <sys/ioctl.h> +#endif #include <algorithm> #include <cerrno> +#include <cstddef> #include <iostream> #include <memory> #include <set> @@ -53,15 +57,11 @@ #include "kudu/util/slice.h" #include "kudu/util/status.h" -#include <sys/socket.h> -#ifdef __linux__ -#include <sys/ioctl.h> -#endif - using kudu::security::TlsSocket; using std::includes; using std::set; using std::shared_ptr; +using std::string; using std::unique_ptr; using strings::Substitute; @@ -199,8 +199,8 @@ struct tcp_info { /// /// Connection /// -Connection::Connection(ReactorThread *reactor_thread, - Sockaddr remote, +Connection::Connection(ReactorThread* reactor_thread, + const Sockaddr& remote, unique_ptr<Socket> socket, Direction direction, CredentialsPolicy policy) @@ -231,7 +231,7 @@ Status Connection::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_re void Connection::EpollRegister(ev::loop_ref& loop) { DCHECK(reactor_thread_->IsCurrentThread()); - DVLOG(4) << "Registering connection for epoll: " << ToString(); + DVLOG(4) << Substitute("registering connection for epoll: $0", ToString()); write_io_.set(loop); write_io_.set(socket_->GetFd(), ev::WRITE); write_io_.set<Connection, &Connection::WriteHandler>(this); @@ -259,7 +259,7 @@ Connection::~Connection() { bool Connection::Idle() const { DCHECK(reactor_thread_->IsCurrentThread()); // check if we're in the middle of receiving something - InboundTransfer *transfer = inbound_.get(); + InboundTransfer* transfer = inbound_.get(); if (transfer && (transfer->TransferStarted())) { return false; } @@ -284,7 +284,7 @@ bool Connection::Idle() const { return true; } -void Connection::Shutdown(const Status &status, +void Connection::Shutdown(const Status& status, unique_ptr<ErrorStatusPB> rpc_error) { DCHECK(reactor_thread_->IsCurrentThread()); shutdown_status_ = status.CloneAndPrepend("RPC connection failed"); @@ -292,16 +292,17 @@ void Connection::Shutdown(const Status &status, if (inbound_ && inbound_->TransferStarted()) { double secs_since_active = (reactor_thread_->cur_time() - last_activity_time_).ToSeconds(); - LOG(WARNING) << "Shutting down " << ToString() - << " with pending inbound data (" - << inbound_->StatusAsString() << ", last active " - << HumanReadableElapsedTime::ToShortString(secs_since_active) - << " ago, status=" << status.ToString() << ")"; + LOG(WARNING) << Substitute( + "shutting down $0 with pending inbound data: " + "$1; last active $2 ago: status $3", + ToString(), + inbound_->StatusAsString(), + HumanReadableElapsedTime::ToShortString(secs_since_active), + status.ToString()); } // Clear any calls which have been sent and were awaiting a response. - for (const car_map_t::value_type &v : awaiting_response_) { - CallAwaitingResponse *c = v.second; + for (const auto& [_, c] : awaiting_response_) { if (c->call) { // Make sure every awaiting call receives the error info, if any. unique_ptr<ErrorStatusPB> error; @@ -320,7 +321,7 @@ void Connection::Shutdown(const Status &status, // Clear any outbound transfers. while (!outbound_transfers_.empty()) { - OutboundTransfer *t = &outbound_transfers_.front(); + auto* t = &outbound_transfers_.front(); outbound_transfers_.pop_front(); delete t; } @@ -336,14 +337,14 @@ void Connection::Shutdown(const Status &status, void Connection::QueueOutbound(unique_ptr<OutboundTransfer> transfer) { DCHECK(reactor_thread_->IsCurrentThread()); - if (!shutdown_status_.ok()) { + if (PREDICT_FALSE(!shutdown_status_.ok())) { // If we've already shut down, then we just need to abort the // transfer rather than bothering to queue it. transfer->Abort(shutdown_status_); return; } - DVLOG(3) << "Queueing transfer: " << transfer->HexDump(); + DVLOG(3) << Substitute("queueing transfer: $0", transfer->HexDump()); outbound_transfers_.push_back(*transfer.release()); @@ -360,14 +361,16 @@ Connection::CallAwaitingResponse::~CallAwaitingResponse() { DCHECK(conn->reactor_thread_->IsCurrentThread()); } -void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) { +void Connection::CallAwaitingResponse::HandleTimeout(ev::timer& watcher, + int /*revents*/) { if (remaining_timeout > 0) { - if (watcher.remaining() < -1.0) { - LOG(WARNING) << "RPC call timeout handler was delayed by " - << -watcher.remaining() << "s! This may be due to a process-wide " - << "pause such as swapping, logging-related delays, or allocator lock " - << "contention. Will allow an additional " - << remaining_timeout << "s for a response."; + const auto rem = watcher.remaining(); + if (PREDICT_FALSE(rem < -1.0)) { + LOG(WARNING) << Substitute( + "RPC call timeout handler was delayed by $0s: this may be due " + "to a process-wide pause such as swapping, logging-related delays, " + "or allocator lock contention. Will allow extra $1s for a response", + rem, remaining_timeout); } watcher.set(remaining_timeout, 0); @@ -379,7 +382,7 @@ void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int rev conn->HandleOutboundCallTimeout(this); } -void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) { +void Connection::HandleOutboundCallTimeout(CallAwaitingResponse* car) { DCHECK(reactor_thread_->IsCurrentThread()); if (!car->call) { // The RPC may have been cancelled before the timeout was hit. @@ -407,7 +410,7 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) { // already timed out. } -void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) { +void Connection::CancelOutboundCall(const shared_ptr<OutboundCall>& call) { CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id()); if (car != nullptr) { // car->call may be NULL if the call has timed out already. @@ -423,7 +426,7 @@ Status Connection::GetLocalAddress(Sockaddr* addr) const { } // Inject a cancellation when 'call' is in state 'FLAGS_rpc_inject_cancellation_state'. -void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> &call) { +void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall>& call) { if (PREDICT_FALSE(call->ShouldInjectCancellation())) { reactor_thread_->reactor()->messenger()->QueueCancellation(call); } @@ -435,7 +438,7 @@ void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> & struct CallTransferCallbacks : public TransferCallbacks { public: explicit CallTransferCallbacks(shared_ptr<OutboundCall> call, - Connection *conn) + Connection* conn) : call_(std::move(call)), conn_(conn) {} void NotifyTransferFinished() override { @@ -452,9 +455,9 @@ struct CallTransferCallbacks : public TransferCallbacks { delete this; } - void NotifyTransferAborted(const Status &status) override { - VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: " - << status.ToString(); + void NotifyTransferAborted(const Status& status) override { + VLOG(1) << Substitute( + "transfer of $0 aborted: $1", call_->ToString(), status.ToString()); delete this; } @@ -484,7 +487,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) { DCHECK(!call->cancellation_requested()); // Assign the call ID. - int32_t call_id = GetNextCallId(); + const int32_t call_id = GetNextCallId(); call->set_call_id(call_id); // Serialize the actual bytes to be put on the wire. @@ -501,7 +504,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) { car->call = call; // Set up the timeout timer. - const MonoDelta &timeout = call->controller()->timeout(); + const auto& timeout = call->controller()->timeout(); if (timeout.Initialized()) { reactor_thread_->RegisterTimeout(&car->timeout_timer); car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*) @@ -544,7 +547,7 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) { car->timeout_timer.start(); } - TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this); + TransferCallbacks* cb = new CallTransferCallbacks(std::move(call), this); awaiting_response_[call_id] = car.release(); QueueOutbound(unique_ptr<OutboundTransfer>( OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, cb))); @@ -553,18 +556,17 @@ void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) { // Callbacks for sending an RPC call response from the server. // This takes ownership of the InboundCall object so that, once it has // been responded to, we can free up all of the associated memory. -struct ResponseTransferCallbacks : public TransferCallbacks { +struct ResponseTransferCallbacks final : public TransferCallbacks { public: - ResponseTransferCallbacks(unique_ptr<InboundCall> call, - Connection *conn) : - call_(std::move(call)), - conn_(conn) - {} + ResponseTransferCallbacks(unique_ptr<InboundCall> call, Connection* conn) + : call_(std::move(call)), + conn_(conn) { + } - ~ResponseTransferCallbacks() { + ~ResponseTransferCallbacks() override { // Remove the call from the map. - InboundCall *call_from_map = EraseKeyReturnValuePtr( - &conn_->calls_being_handled_, call_->call_id()); + auto* call_from_map = EraseKeyReturnValuePtr( + &conn_->calls_being_handled_, call_->call_id()); DCHECK_EQ(call_from_map, call_.get()); } @@ -573,38 +575,38 @@ struct ResponseTransferCallbacks : public TransferCallbacks { } void NotifyTransferAborted(const Status& /*status*/) override { - LOG(WARNING) << "Connection torn down before " << - call_->ToString() << " could send its response"; + LOG(WARNING) << Substitute( + "$0 torn down before $1 could send its response", + conn_->ToString(), call_->ToString()); delete this; } private: unique_ptr<InboundCall> call_; - Connection *conn_; + Connection* conn_; }; // Reactor task which puts a transfer on the outbound transfer queue. class QueueTransferTask : public ReactorTask { public: - QueueTransferTask(unique_ptr<OutboundTransfer> transfer, - Connection *conn) - : transfer_(std::move(transfer)), - conn_(conn) - {} + QueueTransferTask(unique_ptr<OutboundTransfer> transfer, Connection* conn) + : transfer_(std::move(transfer)), + conn_(conn) { + } void Run(ReactorThread* /*thr*/) override { conn_->QueueOutbound(std::move(transfer_)); delete this; } - void Abort(const Status &status) override { + void Abort(const Status& status) override { transfer_->Abort(status); delete this; } private: unique_ptr<OutboundTransfer> transfer_; - Connection *conn_; + Connection* conn_; }; void Connection::QueueResponseForCall(unique_ptr<InboundCall> call) { @@ -621,15 +623,15 @@ void Connection::QueueResponseForCall(unique_ptr<InboundCall> call) { TransferPayload tmp_slices; call->SerializeResponseTo(&tmp_slices); - TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this); + TransferCallbacks* cb = new ResponseTransferCallbacks(std::move(call), this); // After the response is sent, can delete the InboundCall object. // We set a dummy call ID and required feature set, since these are not needed // when sending responses. unique_ptr<OutboundTransfer> t( OutboundTransfer::CreateForCallResponse(tmp_slices, cb)); - QueueTransferTask *task = new QueueTransferTask(std::move(t), this); - reactor_thread_->reactor()->ScheduleReactorTask(task); + reactor_thread_->reactor()->ScheduleReactorTask( + new QueueTransferTask(std::move(t), this)); } void Connection::set_confidential(bool is_confidential) { @@ -646,10 +648,10 @@ RpczStore* Connection::rpcz_store() { return reactor_thread_->reactor()->messenger()->rpcz_store(); } -void Connection::ReadHandler(ev::io &watcher, int revents) { +void Connection::ReadHandler(ev::io& /*watcher*/, int revents) { DCHECK(reactor_thread_->IsCurrentThread()); - DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")"; + DVLOG(3) << Substitute("$0 ReadHandler(revents=$1)", ToString(), revents); if (revents & EV_ERROR) { reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() + ": ReadHandler encountered an error")); @@ -665,25 +667,34 @@ void Connection::ReadHandler(ev::io &watcher, int revents) { Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf); if (PREDICT_FALSE(!status.ok())) { if (status.posix_code() == ESHUTDOWN) { - VLOG(1) << ToString() << " shut down by remote end."; + VLOG(1) << Substitute("$0 shut down by remote end", ToString()); } else { - LOG(WARNING) << ToString() << " recv error: " << status.ToString(); + LOG(WARNING) << Substitute("$0 recv error: $1", + ToString(), status.ToString()); } reactor_thread_->DestroyConnection(this, status); return; } if (!inbound_->TransferFinished()) { - DVLOG(3) << ToString() << ": read is not yet finished yet."; + DVLOG(3) << Substitute("$0: read is not yet finished yet", ToString()); return; } - DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() << " bytes"; - - if (direction_ == CLIENT) { - HandleCallResponse(std::move(inbound_)); - } else if (direction_ == SERVER) { - HandleIncomingCall(std::move(inbound_)); - } else { - LOG(FATAL) << "Invalid direction: " << direction_; + DVLOG(3) << Substitute("$0: finished reading $1 bytes", + ToString(), inbound_->data().size()); + + switch (direction_) { + case CLIENT: + HandleCallResponse(std::move(inbound_)); + break; + + case SERVER: + HandleIncomingCall(std::move(inbound_)); + break; + + default: + LOG(DFATAL) << Substitute("$0: invalid direction", + static_cast<uint16_t>(direction_)); + break; } if (extra_buf.size() > 0) { @@ -699,19 +710,24 @@ void Connection::HandleIncomingCall(unique_ptr<InboundTransfer> transfer) { unique_ptr<InboundCall> call(new InboundCall(this)); Status s = call->ParseFrom(std::move(transfer)); - if (!s.ok()) { - LOG(WARNING) << ToString() << ": received bad data: " << s.ToString(); - // TODO: shutdown? probably, since any future stuff on this socket will be - // "unsynchronized" + if (PREDICT_FALSE(!s.ok())) { + LOG(WARNING) << Substitute("$0: received bad data: '$1'", + ToString(), s.ToString()); + // Shutting down down the connection since there is a high risk of receiving + // "unsynchronized" data on this socket after this error. + Shutdown(s); return; } - if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) { - LOG(WARNING) << ToString() << ": received call ID " << call->call_id() << - " but was already processing this ID! Ignoring"; + if (PREDICT_FALSE(!InsertIfNotPresent(&calls_being_handled_, + call->call_id(), + call.get()))) { + LOG(WARNING) << Substitute( + "$0: received call ID $1 but was already processing this ID, ignoring", + ToString(), call->call_id()); reactor_thread_->DestroyConnection( - this, Status::RuntimeError("Received duplicate call id", - Substitute("$0", call->call_id()))); + this, Status::RuntimeError("Received duplicate call id", + Substitute("$0", call->call_id()))); return; } @@ -723,11 +739,12 @@ void Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) { unique_ptr<CallResponse> resp(new CallResponse); CHECK_OK(resp->ParseFrom(std::move(transfer))); - CallAwaitingResponse *car_ptr = - EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id()); + CallAwaitingResponse* car_ptr = EraseKeyReturnValuePtr( + &awaiting_response_, resp->call_id()); if (PREDICT_FALSE(car_ptr == nullptr)) { - LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which " - << "was not pending! Ignoring."; + LOG(WARNING) << Substitute( + "$0: got a response for call id $1 which was not pending, ignoring", + ToString(), resp->call_id()); return; } @@ -736,8 +753,9 @@ void Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) { if (PREDICT_FALSE(!car->call)) { // The call already failed due to a timeout. - VLOG(1) << "Got response to call id " << resp->call_id() << " after client " - << "already timed out or cancelled"; + VLOG(1) << Substitute( + "got response to call id $0 after client already timed out or cancelled", + resp->call_id()); return; } @@ -747,7 +765,7 @@ void Connection::HandleCallResponse(unique_ptr<InboundTransfer> transfer) { MaybeInjectCancellation(car->call); } -void Connection::WriteHandler(ev::io &watcher, int revents) { +void Connection::WriteHandler(ev::io& /*watcher*/, int revents) { DCHECK(reactor_thread_->IsCurrentThread()); if (revents & EV_ERROR) { @@ -755,11 +773,12 @@ void Connection::WriteHandler(ev::io &watcher, int revents) { ": writeHandler encountered an error")); return; } - DVLOG(3) << ToString() << ": writeHandler: revents = " << revents; + DVLOG(3) << Substitute("$0: writeHandler: revents=$1", ToString(), revents); if (outbound_transfers_.empty()) { - LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is " - "nothing to write."; + LOG(WARNING) << Substitute( + "$0 got a ready-to-write callback, but there is nothing to write", + ToString()); write_io_.stop(); return; } @@ -770,7 +789,7 @@ void Connection::WriteHandler(ev::io &watcher, int revents) { Connection::ProcessOutboundTransfersResult Connection::ProcessOutboundTransfers() { while (!outbound_transfers_.empty()) { - OutboundTransfer* transfer = &(outbound_transfers_.front()); + OutboundTransfer* transfer = &outbound_transfers_.front(); if (!transfer->TransferStarted()) { if (transfer->is_for_outbound_call()) { @@ -813,13 +832,14 @@ Connection::ProcessOutboundTransfersResult Connection::ProcessOutboundTransfers( last_activity_time_ = reactor_thread_->cur_time(); Status status = transfer->SendBuffer(socket_.get()); if (PREDICT_FALSE(!status.ok())) { - LOG(WARNING) << ToString() << " send error: " << status.ToString(); + LOG(WARNING) << Substitute( + "$0 send error: $1", ToString(), status.ToString()); reactor_thread_->DestroyConnection(this, status); return kConnectionDestroyed; } if (!transfer->TransferFinished()) { - DVLOG(3) << ToString() << ": writeHandler: xfer not finished."; + DVLOG(3) << Substitute("$0: writeHandler: xfer not finished", ToString()); return kMoreToSend; } @@ -830,14 +850,14 @@ Connection::ProcessOutboundTransfersResult Connection::ProcessOutboundTransfers( return kNoMoreToSend; } -std::string Connection::ToString() const { +string Connection::ToString() const { // This may be called from other threads, so we cannot // include anything in the output about the current state, // which might concurrently change from another thread. - return strings::Substitute( - "$0 $1", - direction_ == SERVER ? "server connection from" : "client connection to", - remote_.ToString()); + return Substitute("$0 $1", + direction_ == SERVER + ? "server connection from" + : "client connection to", remote_.ToString()); } // Reactor task that transitions this Connection from connection negotiation to @@ -847,22 +867,21 @@ class NegotiationCompletedTask : public ReactorTask { NegotiationCompletedTask(Connection* conn, Status negotiation_status, std::unique_ptr<ErrorStatusPB> rpc_error) - : conn_(conn), - negotiation_status_(std::move(negotiation_status)), - rpc_error_(std::move(rpc_error)) { + : conn_(conn), + negotiation_status_(std::move(negotiation_status)), + rpc_error_(std::move(rpc_error)) { } - void Run(ReactorThread *rthread) override { + void Run(ReactorThread* rthread) override { rthread->CompleteConnectionNegotiation(conn_, negotiation_status_, std::move(rpc_error_)); delete this; } - void Abort(const Status &status) override { + void Abort(const Status& status) override { DCHECK(conn_->reactor_thread()->reactor()->closing()); - VLOG(1) << "Failed connection negotiation due to shut down reactor thread: " - << status.ToString(); + VLOG(1) << Substitute("connection negotiation aborted: $0", status.ToString()); delete this; } @@ -885,7 +904,7 @@ void Connection::MarkNegotiationComplete() { } Status Connection::DumpPB(const DumpConnectionsRequestPB& req, - RpcConnectionPB* resp) { + RpcConnectionPB* resp) const { DCHECK(reactor_thread_->IsCurrentThread()); resp->set_remote_ip(remote_.ToString()); if (negotiation_complete_) { @@ -894,27 +913,31 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB& req, resp->set_state(RpcConnectionPB::NEGOTIATING); } - if (direction_ == CLIENT) { - for (const car_map_t::value_type& entry : awaiting_response_) { - CallAwaitingResponse* c = entry.second; - if (c->call) { - c->call->DumpPB(req, resp->add_calls_in_flight()); + switch (direction_) { + case CLIENT: + for (const auto& [_, c]: awaiting_response_) { + if (c->call) { + c->call->DumpPB(req, resp->add_calls_in_flight()); + } } - } + resp->set_outbound_queue_size(outbound_transfers_.size()); + break; - resp->set_outbound_queue_size(num_queued_outbound_transfers()); - } else if (direction_ == SERVER) { - if (negotiation_complete_) { - // It's racy to dump credentials while negotiating, since the Connection - // object is owned by the negotiation thread at that point. - resp->set_remote_user_credentials(remote_user_.ToString()); - } - for (const inbound_call_map_t::value_type& entry : calls_being_handled_) { - InboundCall* c = entry.second; - c->DumpPB(req, resp->add_calls_in_flight()); - } - } else { - LOG(FATAL); + case SERVER: + if (negotiation_complete_) { + // It's racy to dump credentials while negotiating, since the Connection + // object is owned by the negotiation thread at that point. + resp->set_remote_user_credentials(remote_user_.ToString()); + } + for (const auto& [_, c]: calls_being_handled_) { + c->DumpPB(req, resp->add_calls_in_flight()); + } + break; + + default: + LOG(DFATAL) << Substitute("$0: invalid direction", + static_cast<uint16_t>(direction_)); + break; } #ifdef __linux__ if (negotiation_complete_ && remote_.is_ip()) { @@ -937,11 +960,10 @@ Status Connection::DumpPB(const DumpConnectionsRequestPB& req, Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const { DCHECK(reactor_thread_->IsCurrentThread()); int fd = socket_->GetFd(); - CHECK_GE(fd, 0); + DCHECK_GE(fd, 0); // Fetch TCP_INFO statistics from the kernel. - tcp_info ti; - memset(&ti, 0, sizeof(ti)); + tcp_info ti = {}; socklen_t len = sizeof(ti); int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len); if (rc == 0) { @@ -1026,12 +1048,12 @@ Status Connection::GetTransportDetailsPB(TransportDetailsPB* pb) const { tls->set_cipher_suite(tls_socket->GetCipherDescription()); } - int fd = socket_->GetFd(); - CHECK_GE(fd, 0); + const int fd = socket_->GetFd(); + DCHECK_GE(fd, 0); int32_t max_seg_size = 0; socklen_t optlen = sizeof(max_seg_size); int ret = ::getsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &max_seg_size, &optlen); - if (ret) { + if (PREDICT_FALSE(ret)) { int err = errno; return Status::NetworkError( "getsockopt(TCP_MAXSEG) failed", ErrnoToString(err), err); diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h index d0afbe536..eab48f54e 100644 --- a/src/kudu/rpc/connection.h +++ b/src/kudu/rpc/connection.h @@ -16,7 +16,6 @@ // under the License. #pragma once -#include <cstddef> #include <cstdint> #include <limits> #include <memory> @@ -89,8 +88,8 @@ class Connection : public RefCountedThreadSafe<Connection> { // remote: the address of the remote end // socket: the socket to take ownership of. // direction: whether we are the client or server side - Connection(ReactorThread *reactor_thread, - Sockaddr remote, + Connection(ReactorThread* reactor_thread, + const Sockaddr& remote, std::unique_ptr<Socket> socket, Direction direction, CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS); @@ -138,7 +137,7 @@ class Connection : public RefCountedThreadSafe<Connection> { // Cancel an outbound call by removing any reference to it by CallAwaitingResponse // in 'awaiting_responses_'. - void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call); + void CancelOutboundCall(const std::shared_ptr<OutboundCall>& call); // The address of the remote end of the connection. const Sockaddr& remote() const { return remote_; } @@ -184,10 +183,10 @@ class Connection : public RefCountedThreadSafe<Connection> { RpczStore* rpcz_store(); // libev callback when data is available to read. - void ReadHandler(ev::io &watcher, int revents); + void ReadHandler(ev::io& watcher, int revents); // NOLINT(google-runtime-references) // libev callback when we may write to the socket. - void WriteHandler(ev::io &watcher, int revents); + void WriteHandler(ev::io& watcher, int revents);// NOLINT(google-runtime-references) enum ProcessOutboundTransfersResult { // All of the transfers in the queue have been sent successfully. @@ -224,7 +223,7 @@ class Connection : public RefCountedThreadSafe<Connection> { void MarkNegotiationComplete(); Status DumpPB(const DumpConnectionsRequestPB& req, - RpcConnectionPB* resp); + RpcConnectionPB* resp) const; ReactorThread* reactor_thread() const { return reactor_thread_; } @@ -263,10 +262,6 @@ class Connection : public RefCountedThreadSafe<Connection> { scheduled_for_shutdown_ = true; } - size_t num_queued_outbound_transfers() const { - return outbound_transfers_.size(); - } - private: friend struct CallAwaitingResponse; friend class QueueTransferTask; @@ -279,9 +274,9 @@ class Connection : public RefCountedThreadSafe<Connection> { ~CallAwaitingResponse(); // Notification from libev that the call has timed out. - void HandleTimeout(ev::timer &watcher, int revents); + void HandleTimeout(ev::timer& watcher, int revents); // NOLINT(google-runtime-references) - Connection *conn; + Connection* conn; std::shared_ptr<OutboundCall> call; ev::timer timeout_timer; @@ -312,7 +307,7 @@ class Connection : public RefCountedThreadSafe<Connection> { // The given CallAwaitingResponse has elapsed its user-defined timeout. // Set it to Failed. - void HandleOutboundCallTimeout(CallAwaitingResponse *car); + void HandleOutboundCallTimeout(CallAwaitingResponse* car); // Queue a transfer for sending on this connection. // We will take ownership of the transfer. @@ -321,7 +316,7 @@ class Connection : public RefCountedThreadSafe<Connection> { // Internal test function for injecting cancellation request when 'call' // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'. - void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call); + void MaybeInjectCancellation(const std::shared_ptr<OutboundCall>& call); Status GetSocketStatsPB(SocketStatsPB* pb) const; @@ -344,7 +339,7 @@ class Connection : public RefCountedThreadSafe<Connection> { RemoteUser remote_user_; // whether we are client or server - Direction direction_; + const Direction direction_; // The last time we read or wrote from the socket. MonoTime last_activity_time_; diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc index 6820490b3..df1046005 100644 --- a/src/kudu/rpc/messenger.cc +++ b/src/kudu/rpc/messenger.cc @@ -269,9 +269,8 @@ Status Messenger::UnregisterService(const string& service_name) { return Status::OK(); } -void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { - Reactor *reactor = RemoteToReactor(call->conn_id().remote()); - reactor->QueueOutboundCall(call); +void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall>& call) { + RemoteToReactor(call->conn_id().remote())->QueueOutboundCall(call); } void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) { @@ -304,17 +303,16 @@ void Messenger::QueueInboundCall(unique_ptr<InboundCall> call) { WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call"); } -void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) { - Reactor *reactor = RemoteToReactor(call->conn_id().remote()); +void Messenger::QueueCancellation(const shared_ptr<OutboundCall>& call) { + Reactor* reactor = RemoteToReactor(call->conn_id().remote()); reactor->QueueCancellation(call); } -void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) { - Reactor *reactor = RemoteToReactor(remote); - reactor->RegisterInboundSocket(new_socket, remote); +void Messenger::RegisterInboundSocket(Socket* new_socket, const Sockaddr& remote) { + RemoteToReactor(remote)->RegisterInboundSocket(new_socket, remote); } -Messenger::Messenger(const MessengerBuilder &bld) +Messenger::Messenger(const MessengerBuilder& bld) : name_(bld.name_), state_(kStarted), authentication_(RpcAuthentication::REQUIRED), @@ -352,7 +350,7 @@ Messenger::~Messenger() { STLDeleteElements(&reactors_); } -Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) { +Reactor* Messenger::RemoteToReactor(const Sockaddr& remote) const { // This is just a static partitioning; we could get a lot // fancier with assigning Sockaddrs to Reactors. return reactors_[remote.HashCode() % reactors_.size()]; diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h index 753681d46..a8044fca4 100644 --- a/src/kudu/rpc/messenger.h +++ b/src/kudu/rpc/messenger.h @@ -203,7 +203,7 @@ class MessengerBuilder { // list and // https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_ciphersuites.html // for SSL_CTX_set_ciphersuites() API details. - MessengerBuilder &set_rpc_tls_ciphersuites( + MessengerBuilder& set_rpc_tls_ciphersuites( const std::string& rpc_tls_ciphersuites) { rpc_tls_ciphersuites_ = rpc_tls_ciphersuites; return *this; @@ -211,7 +211,7 @@ class MessengerBuilder { // Set the minimum protocol version to allow when for securing RPC connections // with TLS. May be one of 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'. - MessengerBuilder &set_rpc_tls_min_protocol( + MessengerBuilder& set_rpc_tls_min_protocol( const std::string& rpc_tls_min_protocol) { rpc_tls_min_protocol_ = rpc_tls_min_protocol; return *this; @@ -473,7 +473,7 @@ class Messenger { explicit Messenger(const MessengerBuilder& bld); - Reactor* RemoteToReactor(const Sockaddr& remote); + Reactor* RemoteToReactor(const Sockaddr& remote) const; Status Init(); void RunTimeoutThread(); void UpdateCurTime(); diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc index 9501a388a..b6b355960 100644 --- a/src/kudu/rpc/proxy.cc +++ b/src/kudu/rpc/proxy.cc @@ -20,6 +20,7 @@ #include <functional> #include <iostream> #include <memory> +#include <type_traits> #include <utility> #include <vector> @@ -27,7 +28,7 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/rpc/messenger.h" +#include "kudu/rpc/messenger.h" // IWYU pragma: keep #include "kudu/rpc/outbound_call.h" #include "kudu/rpc/remote_method.h" #include "kudu/rpc/response_callback.h" @@ -43,6 +44,7 @@ #include "kudu/util/user.h" using google::protobuf::Message; +using std::make_shared; using std::string; using std::shared_ptr; using std::unique_ptr; @@ -52,7 +54,7 @@ using strings::Substitute; namespace kudu { namespace rpc { -Proxy::Proxy(std::shared_ptr<Messenger> messenger, +Proxy::Proxy(shared_ptr<Messenger> messenger, const Sockaddr& remote, string hostname, string service_name) @@ -60,16 +62,16 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger, dns_resolver_(nullptr), messenger_(std::move(messenger)), is_started_(false) { - CHECK(messenger_ != nullptr); + DCHECK(messenger_); DCHECK(!service_name_.empty()) << "Proxy service name must not be blank"; DCHECK(remote.is_initialized()); // By default, we set the real user to the currently logged-in user. // Effective user and password remain blank. string real_user; - Status s = GetLoggedInUser(&real_user); - if (!s.ok()) { - LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: " - << s.ToString() << " before connecting to remote: " << remote.ToString(); + if (auto s = GetLoggedInUser(&real_user); !s.ok()) { + LOG(WARNING) << Substitute("$0: unable to get logged-in username " + "before connecting to remote $1 via $2 proxy", + s.ToString(), remote.ToString(), service_name_); } UserCredentials creds; @@ -77,7 +79,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger, conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds)); } -Proxy::Proxy(std::shared_ptr<Messenger> messenger, +Proxy::Proxy(shared_ptr<Messenger> messenger, HostPort hp, DnsResolver* dns_resolver, string service_name) @@ -86,12 +88,12 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger, dns_resolver_(dns_resolver), messenger_(std::move(messenger)), is_started_(false) { - CHECK(messenger_ != nullptr); + DCHECK(messenger_); DCHECK(!service_name_.empty()) << "Proxy service name must not be blank"; DCHECK(hp_.Initialized()); } -Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const { +Sockaddr* Proxy::GetSingleSockaddr(vector<Sockaddr>* addrs) const { DCHECK(!addrs->empty()); if (PREDICT_FALSE(addrs->size() > 1)) { LOG(WARNING) << Substitute( @@ -110,8 +112,9 @@ void Proxy::Init(Sockaddr addr) { string real_user; Status s = GetLoggedInUser(&real_user); if (!s.ok()) { - LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: " - << s.ToString() << " before connecting to host/port: " << hp_.ToString(); + LOG(WARNING) << Substitute( + "$0: unable to get logged-in username before connecting to $1 via $2 proxy", + s.ToString(), hp_.ToString(), service_name_); } vector<Sockaddr> addrs; if (!addr.is_initialized()) { @@ -141,9 +144,14 @@ void Proxy::EnqueueRequest(const string& method, OutboundCall::CallbackBehavior cb_behavior) const { ConnectionId connection = conn_id(); DCHECK(connection.remote().is_initialized()); - controller->call_.reset( - new OutboundCall(connection, {service_name_, method}, std::move(req_payload), - cb_behavior, response, controller, callback)); + controller->call_ = make_shared<OutboundCall>( + connection, + RemoteMethod{service_name_, method}, + std::move(req_payload), + cb_behavior, + response, + controller, + callback); controller->SetMessenger(messenger_.get()); // If this fails to queue, the callback will get called immediately @@ -151,7 +159,7 @@ void Proxy::EnqueueRequest(const string& method, messenger_->QueueOutboundCall(controller->call_); } -void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method, +void Proxy::RefreshDnsAndEnqueueRequest(const string& method, unique_ptr<RequestPayload> req_payload, google::protobuf::Message* response, RpcController* controller, @@ -169,7 +177,7 @@ void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method, // NOTE: we need to keep a reference here because the callback may end up // destructing the controller and the outbound call, _while_ the callback // is running from within the call! - auto shared_call = std::make_shared<OutboundCall>( + auto shared_call = make_shared<OutboundCall>( conn_id(), RemoteMethod{service_name_, method}, response, controller, callback); controller->call_ = shared_call; controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh physical address")); @@ -194,7 +202,7 @@ void Proxy::AsyncRequest(const string& method, google::protobuf::Message* response, RpcController* controller, const ResponseCallback& callback) { - CHECK(!controller->call_) << "Controller should be reset"; + DCHECK(!controller->call_) << "Controller should be reset"; base::subtle::NoBarrier_Store(&is_started_, true); // TODO(awong): it would be great if we didn't have to heap allocate the // payload. @@ -261,18 +269,18 @@ Status Proxy::SyncRequest(const string& method, } void Proxy::set_user_credentials(UserCredentials user_credentials) { - CHECK(base::subtle::NoBarrier_Load(&is_started_) == false) - << "It is illegal to call set_user_credentials() after request processing has started"; + DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false) + << "illegal to call set_user_credentials() after request processing has started"; conn_id_.set_user_credentials(std::move(user_credentials)); } void Proxy::set_network_plane(string network_plane) { - CHECK(base::subtle::NoBarrier_Load(&is_started_) == false) - << "It is illegal to call set_network_plane() after request processing has started"; + DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false) + << "illegal to call set_network_plane() after request processing has started"; conn_id_.set_network_plane(std::move(network_plane)); } -std::string Proxy::ToString() const { +string Proxy::ToString() const { return Substitute("$0@$1", service_name_, conn_id_.ToString()); } diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc index 64ef18861..815119ed8 100644 --- a/src/kudu/rpc/reactor.cc +++ b/src/kudu/rpc/reactor.cc @@ -72,6 +72,7 @@ static const int kDefaultLibEvFlags = ev::KQUEUE; static const int kDefaultLibEvFlags = ev::AUTO; #endif +using std::function; using std::string; using std::shared_ptr; using std::unique_ptr; @@ -233,7 +234,7 @@ void ReactorThread::PollCompleteCb(struct ev_loop* loop) noexcept { } void ReactorThread::Shutdown(Messenger::ShutdownMode mode) { - CHECK(reactor_->closing()) << "Should be called after setting closing_ flag"; + DCHECK(reactor_->closing()) << "Should be called after setting closing_ flag"; VLOG(1) << name() << ": shutting down Reactor thread."; WakeThread(); @@ -241,7 +242,11 @@ void ReactorThread::Shutdown(Messenger::ShutdownMode mode) { if (mode == Messenger::ShutdownMode::SYNC) { // Join() will return a bad status if asked to join on the currently // running thread. - CHECK_OK(ThreadJoiner(thread_.get()).Join()); + const auto s = ThreadJoiner(thread_.get()).Join(); + if (PREDICT_FALSE(!s.ok())) { + LOG(DFATAL) << Substitute( + "$0: failed to join $1", s.ToString(), thread_->ToString()); + } } } @@ -707,7 +712,8 @@ void ReactorThread::DestroyConnection(Connection* conn, // Unlink connection from lists. if (conn->direction() == Connection::CLIENT) { const auto range = client_conns_.equal_range(conn->outbound_connection_id()); - CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString(); + DCHECK(range.first != range.second) + << "couldn't find connection " << conn->ToString(); // The client_conns_ container is a multi-map. for (auto it = range.first; it != range.second;) { if (it->second.get() == conn) { @@ -728,8 +734,7 @@ void ReactorThread::DestroyConnection(Connection* conn, } } -DelayedTask::DelayedTask(std::function<void(const Status&)> func, - MonoDelta when) +DelayedTask::DelayedTask(function<void(const Status&)> func, MonoDelta when) : func_(std::move(func)), when_(when), thread_(nullptr) { @@ -821,7 +826,7 @@ bool Reactor::closing() const { // Task to call an arbitrary function within the reactor thread. class RunFunctionTask : public ReactorTask { public: - explicit RunFunctionTask(std::function<Status()> f) + explicit RunFunctionTask(function<Status()> f) : function_(std::move(f)), latch_(1) {} void Run(ReactorThread* /*reactor*/) override { @@ -841,7 +846,7 @@ class RunFunctionTask : public ReactorTask { } private: - const std::function<Status()> function_; + const function<Status()> function_; Status status_; CountDownLatch latch_; }; @@ -850,7 +855,7 @@ Status Reactor::GetMetrics(ReactorMetrics* metrics) { return RunOnReactorThread([&]() { return this->thread_.GetMetrics(metrics); }); } -Status Reactor::RunOnReactorThread(std::function<Status()> f) { +Status Reactor::RunOnReactorThread(function<Status()> f) { RunFunctionTask task(std::move(f)); ScheduleReactorTask(&task); return task.Wait(); @@ -950,7 +955,7 @@ void Reactor::ScheduleReactorTask(ReactorTask* task) { bool was_empty; { std::unique_lock<LockType> l(lock_); - if (closing_) { + if (PREDICT_FALSE(closing_)) { // We guarantee the reactor lock is not taken when calling Abort(). l.unlock(); task->Abort(ShutdownError(false)); diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h index 8a966c01f..8944a2b2d 100644 --- a/src/kudu/rpc/reactor.h +++ b/src/kudu/rpc/reactor.h @@ -173,12 +173,12 @@ class ReactorThread { void RegisterTimeout(ev::timer* watcher); // This may be called from another thread. - const std::string &name() const; + const std::string& name() const; MonoTime cur_time() const; // This may be called from another thread. - Reactor *reactor(); + Reactor* reactor(); // Return true if this reactor thread is the thread currently // running. Should be used in DCHECK assertions. @@ -196,7 +196,7 @@ class ReactorThread { // Collect metrics. // Must be called from the reactor thread. - Status GetMetrics(ReactorMetrics *metrics); + Status GetMetrics(ReactorMetrics* metrics); private: friend class AssignOutboundCallTask; @@ -302,7 +302,7 @@ class ReactorThread { // List of current connections coming into the server. conn_list_t server_conns_; - Reactor *reactor_; + Reactor* reactor_; // If a connection has been idle for this much time, it is torn down. const MonoDelta connection_keepalive_time_; diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc index 1ce2da66d..d92c349b7 100644 --- a/src/kudu/util/net/socket.cc +++ b/src/kudu/util/net/socket.cc @@ -30,6 +30,7 @@ #include <limits> #include <ostream> #include <string> +#include <utility> #include <gflags/gflags.h> #include <glog/logging.h> @@ -61,20 +62,62 @@ TAG_FLAG(socket_inject_short_recvs, hidden); TAG_FLAG(socket_inject_short_recvs, unsafe); using std::string; +using std::numeric_limits; using strings::Substitute; namespace kudu { +namespace { + +Status ParseIpAddress(const string& addr_str, Sockaddr* result) { + DCHECK(!addr_str.empty()); + Sockaddr bind_host; + const auto s = bind_host.ParseString(addr_str, 0); + if (PREDICT_FALSE(!s.ok() || bind_host.port() != 0)) { + if (!s.ok()) { + return Status::InvalidArgument( + Substitute("$0: invalid local IP address", addr_str), s.ToString()); + } + return Status::InvalidArgument( + Substitute("$0: unexpected port with IP address", addr_str)); + } + + if (result) { + *result = std::move(bind_host); + } + return Status::OK(); +} + +bool ValidateLocalIpForOutboundSockets( + const char* flagname, const string& value) { + if (value.empty()) { + // The default value should pass the validation. + return true; + } + + if (auto s = ParseIpAddress(value, nullptr); !s.ok()) { + LOG(ERROR) << Substitute("invalid local IP '$0' for --$1: $2", + value, flagname, s.ToString()); + return false; + } + return true; +} +DEFINE_validator(local_ip_for_outbound_sockets, + &ValidateLocalIpForOutboundSockets); + +} // anonymous namespace + + Socket::Socket() - : fd_(-1) { + : fd_(-1) { } Socket::Socket(int fd) - : fd_(fd) { + : fd_(fd) { } Socket::Socket(Socket&& other) noexcept - : fd_(other.Release()) { + : fd_(other.Release()) { } void Socket::Reset(int fd) { @@ -259,12 +302,11 @@ Status Socket::SetReusePort(bool flag) { #endif } -Status Socket::BindAndListen(const Sockaddr &sockaddr, +Status Socket::BindAndListen(const Sockaddr& sockaddr, int listen_queue_size) { RETURN_NOT_OK(SetReuseAddr(true)); RETURN_NOT_OK(Bind(sockaddr)); - RETURN_NOT_OK(Listen(listen_queue_size)); - return Status::OK(); + return Listen(listen_queue_size); } Status Socket::Listen(int listen_queue_size) { @@ -275,7 +317,7 @@ Status Socket::Listen(int listen_queue_size) { return Status::OK(); } -Status Socket::GetSocketAddress(Sockaddr *cur_addr) const { +Status Socket::GetSocketAddress(Sockaddr* cur_addr) const { struct sockaddr_storage ss; socklen_t len = sizeof(ss); DCHECK_GE(fd_, 0); @@ -287,7 +329,7 @@ Status Socket::GetSocketAddress(Sockaddr *cur_addr) const { return Status::OK(); } -Status Socket::GetPeerAddress(Sockaddr *cur_addr) const { +Status Socket::GetPeerAddress(Sockaddr* cur_addr) const { struct sockaddr_storage addr; socklen_t len = sizeof(addr); DCHECK_GE(fd_, 0); @@ -332,7 +374,7 @@ Status Socket::Bind(const Sockaddr& bind_addr) { return Status::OK(); } -Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) { +Status Socket::Accept(Socket* new_conn, Sockaddr* remote, int flags) { TRACE_EVENT0("net", "Socket::Accept"); struct sockaddr_storage addr; socklen_t olen = sizeof(addr); @@ -371,16 +413,11 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) { Status Socket::BindForOutgoingConnection() { Sockaddr bind_host; - Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0); - CHECK(s.ok() && bind_host.port() == 0) - << "Invalid local IP set for 'local_ip_for_outbound_sockets': '" - << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString(); - - RETURN_NOT_OK(Bind(bind_host)); - return Status::OK(); + RETURN_NOT_OK(ParseIpAddress(FLAGS_local_ip_for_outbound_sockets, &bind_host)); + return Bind(bind_host); } -Status Socket::Connect(const Sockaddr &remote) { +Status Socket::Connect(const Sockaddr& remote) { TRACE_EVENT1("net", "Socket::Connect", "remote", remote.ToString()); if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) { @@ -412,7 +449,7 @@ Status Socket::GetSockError() const { return Status::OK(); } -Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) { +Status Socket::Write(const uint8_t* buf, int32_t amt, int32_t* nwritten) { if (amt <= 0) { return Status::NetworkError( StringPrintf("invalid send of %" PRId32 " bytes", @@ -429,8 +466,9 @@ Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) { return Status::OK(); } -Status Socket::Writev(const struct ::iovec *iov, int iov_len, - int64_t *nwritten) { +Status Socket::Writev(const struct ::iovec* iov, + int iov_len, + int64_t* nwritten) { if (PREDICT_FALSE(iov_len <= 0)) { return Status::NetworkError( StringPrintf("writev: invalid io vector length of %d", @@ -441,7 +479,7 @@ Status Socket::Writev(const struct ::iovec *iov, int iov_len, struct msghdr msg; memset(&msg, 0, sizeof(struct msghdr)); - msg.msg_iov = const_cast<iovec *>(iov); + msg.msg_iov = const_cast<iovec*>(iov); msg.msg_iovlen = iov_len; ssize_t res; RETRY_ON_EINTR(res, ::sendmsg(fd_, &msg, MSG_NOSIGNAL)); @@ -455,9 +493,9 @@ Status Socket::Writev(const struct ::iovec *iov, int iov_len, } // Mostly follows writen() from Stevens (2004) or Kerrisk (2010). -Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten, +Status Socket::BlockingWrite(const uint8_t* buf, size_t buflen, size_t* nwritten, const MonoTime& deadline) { - DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported"; + DCHECK_LE(buflen, numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported"; DCHECK(nwritten); size_t tot_written = 0; @@ -492,15 +530,15 @@ Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten } } - if (tot_written < buflen) { + if (PREDICT_FALSE(tot_written < buflen)) { return Status::IOError("Wrote zero bytes on a BlockingWrite() call", StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen)); } return Status::OK(); } -Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) { - if (amt <= 0) { +Status Socket::Recv(uint8_t* buf, int32_t amt, int32_t* nread) { + if (PREDICT_FALSE(amt <= 0)) { return Status::NetworkError( StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL); } @@ -535,8 +573,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) { // Mostly follows readn() from Stevens (2004) or Kerrisk (2010). // One place where we deviate: we consider EOF a failure if < amt bytes are read. -Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline) { - DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported"; +Status Socket::BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const MonoTime& deadline) { + DCHECK_LE(amt, numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported"; DCHECK(nread); size_t tot_read = 0; while (tot_read < amt) { diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h index 830a389e9..2ec3b071c 100644 --- a/src/kudu/util/net/socket.h +++ b/src/kudu/util/net/socket.h @@ -14,8 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_UTIL_NET_SOCKET_H -#define KUDU_UTIL_NET_SOCKET_H +#pragma once #include <cstddef> #include <cstdint> @@ -94,18 +93,18 @@ class Socket { // 1) SetReuseAddr(true) // 2) Bind() // 3) Listen() - Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size); + Status BindAndListen(const Sockaddr& sockaddr, int listen_queue_size); // Start listening for new connections, with the given backlog size. // Requires that the socket has already been bound using Bind(). Status Listen(int listen_queue_size); // Call getsockname to get the address of this socket. - Status GetSocketAddress(Sockaddr *cur_addr) const; + Status GetSocketAddress(Sockaddr* cur_addr) const; // Call getpeername to get the address of the connected peer. // It is virtual so that tests can override. - virtual Status GetPeerAddress(Sockaddr *cur_addr) const; + virtual Status GetPeerAddress(Sockaddr* cur_addr) const; // Return true if this socket is determined to be a loopback connection // (i.e. the local and remote peer share an IP address). @@ -119,10 +118,10 @@ class Socket { Status Bind(const Sockaddr& bind_addr); // Call accept(2) to get a new connection. - Status Accept(Socket *new_conn, Sockaddr *remote, int flags); + Status Accept(Socket* new_conn, Sockaddr* remote, int flags); // start connecting this socket to a remote address. - Status Connect(const Sockaddr &remote); + Status Connect(const Sockaddr& remote); // get the error status using getsockopt(2) Status GetSockError() const; @@ -130,30 +129,30 @@ class Socket { // Write up to 'amt' bytes from 'buf' to the socket. The number of bytes // actually written will be stored in 'nwritten'. If an error is returned, // the value of 'nwritten' is undefined. - virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten); + virtual Status Write(const uint8_t* buf, int32_t amt, int32_t* nwritten); // Vectorized Write. // If there is an error, that error needs to be resolved before calling again. // If there was no error, but not all the bytes were written, the unwritten // bytes must be retried. See writev(2) for more information. - virtual Status Writev(const struct ::iovec *iov, int iov_len, int64_t *nwritten); + virtual Status Writev(const struct ::iovec* iov, int iov_len, int64_t* nwritten); // Blocking Write call, returns IOError unless full buffer is sent. // Underlying Socket expected to be in blocking mode. Fails if any Write() sends 0 bytes. // Returns OK if buflen bytes were sent, otherwise IOError. // Upon return, nwritten will contain the number of bytes actually written. // See also writen() from Stevens (2004) or Kerrisk (2010) - Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten, + Status BlockingWrite(const uint8_t* buf, size_t buflen, size_t* nwritten, const MonoTime& deadline); - virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread); + virtual Status Recv(uint8_t* buf, int32_t amt, int32_t* nread); // Blocking Recv call, returns IOError unless requested amt bytes are read. // Underlying Socket expected to be in blocking mode. Fails if any Recv() reads 0 bytes. // Returns OK if amt bytes were read, otherwise IOError. // Upon return, nread will contain the number of bytes actually read. // See also readn() from Stevens (2004) or Kerrisk (2010) - Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline); + Status BlockingRecv(uint8_t* buf, size_t amt, size_t* nread, const MonoTime& deadline); // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent // to the remote end after the connection has been idle for 'idle_time_s' seconds. @@ -183,5 +182,3 @@ class Socket { }; } // namespace kudu - -#endif