This is an automated email from the ASF dual-hosted git repository.

cmcfarlen pushed a commit to branch 10.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/10.0.x by this push:
     new 0b29acc372 jsonrpc: improve buffering and socket IO  #11763 #11815  
(#11857)
0b29acc372 is described below

commit 0b29acc372be4fb428987d9e4d471329211064b9
Author: Damian Meden <[email protected]>
AuthorDate: Fri Nov 15 02:57:56 2024 +0100

    jsonrpc: improve buffering and socket IO  #11763 #11815  (#11857)
    
    * JSONRPC - Use dynamic and configurable buffer for incoming jsonrpc 
messages.(#11763)
    
    (cherry picked from commit c20661125764e6472763d586cd736184782865cb)
    
    * jsonrpc: Wait for the socket to be ready to read. (#11815)
    
    jsonrpc: Add poll to wait for the socket to be ready to read.
    We found out that sometimes we hit a busy loop here, so adding a timeout
    to make sure we do not hangup forever here.
    
    (cherry picked from commit 5ef103643f2913d9fc4aba15f7846c01ccc75b8e)
---
 doc/admin-guide/files/jsonrpc.yaml.en.rst        |   4 +
 include/mgmt/rpc/server/IPCSocketServer.h        |  15 ++-
 include/shared/rpc/IPCSocketClient.h             |   9 +-
 include/shared/rpc/MessageStorage.h              | 104 ++++++++++++++++++
 include/shared/rpc/RPCClient.h                   |   7 +-
 src/mgmt/rpc/server/IPCSocketServer.cc           |  43 ++++----
 src/mgmt/rpc/server/unit_tests/test_rpcserver.cc |  49 +++++++--
 src/shared/rpc/IPCSocketClient.cc                | 129 +++++++----------------
 8 files changed, 232 insertions(+), 128 deletions(-)

diff --git a/doc/admin-guide/files/jsonrpc.yaml.en.rst 
b/doc/admin-guide/files/jsonrpc.yaml.en.rst
index 26aff3942f..de622e724f 100644
--- a/doc/admin-guide/files/jsonrpc.yaml.en.rst
+++ b/doc/admin-guide/files/jsonrpc.yaml.en.rst
@@ -110,6 +110,10 @@ Field Name                            Description
                                       ``true`` by default.
                                       In case of an unauthorized call is made, 
a corresponding rpc error will be returned, you can
                                       check 
:ref:`jsonrpc-node-errors-unauthorized-action` for details about the errors.
+``incoming_request_max_size``         Maximum allowed size for the incoming 
jsonrpc request. Default ``96000`` bytes. Size must be
+                                      specified in bytes. Note that memory 
will not be allocated all at once, if incoming message
+                                      does not fit in the first chunk of 
memory(32K) an extra amount will be allocated, till the
+                                      requests size hits the max size.
 ===================================== 
=========================================================================================
 
 
diff --git a/include/mgmt/rpc/server/IPCSocketServer.h 
b/include/mgmt/rpc/server/IPCSocketServer.h
index a44dc1b373..5a9bda39ac 100644
--- a/include/mgmt/rpc/server/IPCSocketServer.h
+++ b/include/mgmt/rpc/server/IPCSocketServer.h
@@ -35,6 +35,7 @@
 
 #include "mgmt/rpc/server/CommBase.h"
 #include "mgmt/rpc/config/JsonRPCConfig.h"
+#include "shared/rpc/MessageStorage.h"
 
 namespace rpc::comm
 {
@@ -53,6 +54,8 @@ class IPCSocketServer : public BaseCommInterface
     PEER_CREDENTIALS_ERROR = 1, ///< Error while trying to read the peer 
credentials from the unix socket.
     PERMISSION_DENIED      = 2  ///< Client's socket credential didn't wasn't 
sufficient to execute the method.
   };
+  static const size_t INTERNAL_BUFFER_SIZE{32000};
+  using Buffer = MessageStorage<INTERNAL_BUFFER_SIZE>;
   ///
   /// @brief Connection abstraction class that deals with sending and 
receiving data from the connected peer.
   ///
@@ -60,7 +63,7 @@ class IPCSocketServer : public BaseCommInterface
   /// the client object around.
   struct Client {
     /// @param fd Peer's socket.
-    Client(int fd);
+    Client(int fd, size_t max_req_size);
     /// Destructor will close the socket(if opened);
     ~Client();
 
@@ -74,7 +77,7 @@ class IPCSocketServer : public BaseCommInterface
     /// The size of the buffer to be read is not defined in this function, but 
rather passed in the @c bw parameter.
     /// @return A tuple with a boolean flag indicating if the operation did 
success or not, in case of any error, a text will
     /// be added with a description.
-    std::tuple<bool, std::string> read_all(swoc::FixedBufferWriter &bw) const;
+    std::tuple<bool, std::string> read_all(Buffer &bw) const;
     /// Write the the socket with the passed data.
     /// @return std::error_code.
     void write(std::string const &data, std::error_code &ec) const;
@@ -83,8 +86,9 @@ class IPCSocketServer : public BaseCommInterface
   private:
     /// Wait for data to be ready for reading.
     /// @return true if the data is ready, false otherwise.
-    bool poll_for_data(std::chrono::milliseconds timeout) const;
-    int  _fd; ///< connected peer's socket.
+    bool   poll_for_data(std::chrono::milliseconds timeout) const;
+    int    _fd;           ///< connected peer's socket.
+    size_t _max_req_size; ///< Max incoming request size.
   };
 
 public:
@@ -114,6 +118,7 @@ protected: // unit test access
     static constexpr auto BACKLOG_KEY_STR{"backlog"};
     static constexpr auto 
MAX_RETRY_ON_TR_ERROR_KEY_STR{"max_retry_on_transient_errors"};
     static constexpr auto RESTRICTED_API{"restricted_api"};
+    static constexpr auto MAX_BUFFER_SIZE{"incoming_request_max_size"};
     // is it safe to call Layout now?
     std::string sockPathName;
     std::string lockPathName;
@@ -122,6 +127,7 @@ protected: // unit test access
     int  maxRetriesOnTransientErrors{64};
     bool restrictedAccessApi{
       NON_RESTRICTED_API}; // This config value will drive the permissions of 
the jsonrpc socket(either 0700(default) or 0777).
+    size_t incomingRequestMaxBufferSize{INTERNAL_BUFFER_SIZE * 3};
   };
 
   friend struct YAML::convert<rpc::comm::IPCSocketServer::Config>;
@@ -142,5 +148,6 @@ private:
 
   struct sockaddr_un _serverAddr;
   int                _socket{-1};
+  int                _lock_fd{-1};
 };
 } // namespace rpc::comm
diff --git a/include/shared/rpc/IPCSocketClient.h 
b/include/shared/rpc/IPCSocketClient.h
index 891c2581d7..17e531b75f 100644
--- a/include/shared/rpc/IPCSocketClient.h
+++ b/include/shared/rpc/IPCSocketClient.h
@@ -43,7 +43,7 @@ using namespace std::chrono_literals;
 ///
 /// Error handling: Enclose this inside a try/catch because if any error is 
detected functions will throw.
 struct IPCSocketClient {
-  enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, STREAM_ERROR, UNKNOWN };
+  enum class ReadStatus { NO_ERROR = 0, BUFFER_FULL, READ_ERROR, TIMEOUT, 
UNKNOWN };
   using self_reference = IPCSocketClient &;
 
   IPCSocketClient(std::string path = "/tmp/jsonrpc20.sock") : 
_path{std::move(path)} { memset(&_server, 0, sizeof(_server)); }
@@ -52,13 +52,14 @@ struct IPCSocketClient {
 
   /// Connect to the configured socket path.
   /// Connection will retry every @c ms for @c attempts times if errno is 
EAGAIN
-  self_reference connect(std::chrono::milliseconds ms = 40ms, int attempts = 
5);
+  self_reference connect(std::chrono::milliseconds wait_ms = 40ms, int 
attempts = 5);
 
   /// Send all the passed string to the socket.
   self_reference send(std::string_view data);
 
-  /// Read all the content from the socket till the message is complete.
-  ReadStatus read_all(std::string &content);
+  /// Read all the content until the fd closes or timeout( @c timeout_ms * @c 
attempts) has passed.
+  /// @return @c ReadStatus will be set accordingly with the operation result.
+  ReadStatus read_all(std::string &content, std::chrono::milliseconds 
timeout_ms = 1000ms, int attempts = 10);
 
   /// Closes the socket.
   void
diff --git a/include/shared/rpc/MessageStorage.h 
b/include/shared/rpc/MessageStorage.h
new file mode 100644
index 0000000000..8d455d35ef
--- /dev/null
+++ b/include/shared/rpc/MessageStorage.h
@@ -0,0 +1,104 @@
+/**
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+#pragma once
+
+#include <sstream>
+#include <swoc/BufferWriter.h>
+
+/// @brief Simple storage to keep the jsonrpc server's response.
+///
+///        With small content it will just use the LocalBufferWriter, if the
+///        content gets bigger, then it will just save the buffer into a string
+///        and reuse the already created LocalBufferWriter. If stored data 
fits in the
+///        original bw, then no need to create the extra string. If message 
fist in the
+///        first chunk, no extra space will be allocated into the storage 
string.
+///        This is not meant to be performant as is mainly used for single 
request
+///        data storage, which will be at some point stored into a string 
anyways.
+/// @note  User should deal with the buffer limit.
+///
+template <size_t N> class MessageStorage
+{
+  std::string                _content;
+  swoc::LocalBufferWriter<N> _bw;
+  size_t                     _written{0};
+
+public:
+  char *
+  writable_data()
+  {
+    return _bw.aux_data();
+  }
+
+  void
+  save(size_t n)
+  {
+    _bw.commit(n);
+
+    if (_bw.remaining() == 0) { // no more space available, flush what's on 
the bw
+                                // and reset it.
+      flush();
+    }
+  }
+
+  size_t
+  available() const
+  {
+    return _bw.remaining();
+  }
+
+  void
+  flush()
+  {
+    if (_bw.size() == 0) {
+      return;
+    }
+
+    if (_written == 0) {
+      _content.reserve(_bw.size());
+    } else {
+      // need more space.
+      _content.reserve(_written + _bw.size());
+    }
+
+    _content.append(_bw.data(), _bw.size());
+    _written += _bw.size();
+
+    _bw.clear();
+  }
+
+  std::string
+  str()
+  {
+    if (stored() <= _bw.size()) {
+      // just get it directly from the BW.
+      return {_bw.data(), _bw.size()};
+    }
+
+    // There is content in the bw that needs to be saved into the internal 
string.
+    flush();
+    return _content;
+  }
+
+  size_t
+  stored() const
+  {
+    return _written ? _written : _bw.size();
+  }
+};
diff --git a/include/shared/rpc/RPCClient.h b/include/shared/rpc/RPCClient.h
index e4492f156c..57c629f1aa 100644
--- a/include/shared/rpc/RPCClient.h
+++ b/include/shared/rpc/RPCClient.h
@@ -67,8 +67,11 @@ public:
           // responses.
           ink_assert(!"Buffer full, not enough space to read the response.");
           break;
-        case IPCSocketClient::ReadStatus::STREAM_ERROR:
-          err_text = swoc::bwprint(err_text, "STREAM_ERROR: Error while 
reading response. {}({})", std::strerror(errno), errno);
+        case IPCSocketClient::ReadStatus::READ_ERROR:
+          err_text = swoc::bwprint(err_text, "READ_ERROR: Error while reading 
response. {}({})", std::strerror(errno), errno);
+          break;
+        case IPCSocketClient::ReadStatus::TIMEOUT:
+          err_text = swoc::bwprint(err_text, "TIMEOUT: Couldn't get the 
response. {}({})", std::strerror(errno), errno);
           break;
         default:
           err_text = "Something happened, we can't read the response. Unknown 
error.";
diff --git a/src/mgmt/rpc/server/IPCSocketServer.cc 
b/src/mgmt/rpc/server/IPCSocketServer.cc
index 6f78793917..9e0ec7cbf8 100644
--- a/src/mgmt/rpc/server/IPCSocketServer.cc
+++ b/src/mgmt/rpc/server/IPCSocketServer.cc
@@ -207,7 +207,6 @@ IPCSocketServer::run()
 {
   _running.store(true);
 
-  swoc::LocalBufferWriter<MAX_REQUEST_BUFFER_SIZE> bw;
   while (_running) {
     // poll till socket it's ready.
     if (!this->poll_for_new_client()) {
@@ -219,11 +218,12 @@ IPCSocketServer::run()
 
     std::error_code ec;
     if (int fd = this->accept(ec); !ec) {
-      Client client{fd};
+      Client client{fd, _conf.incomingRequestMaxBufferSize};
+      Buffer bw;
 
       if (auto [ok, errStr] = client.read_all(bw); ok) {
-        const auto   json = std::string{bw.data(), bw.size()};
-        rpc::Context ctx;
+        const std::string &json = bw.str();
+        rpc::Context       ctx;
         // we want to make sure the peer's credentials are ok.
         ctx.get_auth().add_checker(
           [&](TSRPCHandlerOptions const &opt, swoc::Errata &errata) -> void { 
late_check_peer_credentials(fd, opt, errata); });
@@ -240,8 +240,6 @@ IPCSocketServer::run()
     } else {
       Dbg(dbg_ctl, "Error while accepting a new connection on the socket: %s", 
ec.message().c_str());
     }
-
-    bw.clear();
   }
 
   this->close();
@@ -294,13 +292,14 @@ IPCSocketServer::accept(std::error_code &ec) const
 void
 IPCSocketServer::bind(std::error_code &ec)
 {
-  int lock_fd = open(_conf.lockPathName.c_str(), O_RDONLY | O_CREAT, 0600);
-  if (lock_fd == -1) {
+  _lock_fd = open(_conf.lockPathName.c_str(), O_RDONLY | O_CREAT, 0600);
+  if (_lock_fd == -1) {
     ec = std::make_error_code(static_cast<std::errc>(errno));
     return;
   }
 
-  int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
+  int ret = flock(_lock_fd, LOCK_EX | LOCK_NB);
+
   if (ret != 0) {
     ec = std::make_error_code(static_cast<std::errc>(errno));
     return;
@@ -346,10 +345,15 @@ IPCSocketServer::close()
     ::close(_socket);
     _socket = -1;
   }
+
+  if (_lock_fd > 0) {
+    ::close(_lock_fd);
+    _lock_fd = -1;
+  }
 }
 //// client
 
-IPCSocketServer::Client::Client(int fd) : _fd{fd} {}
+IPCSocketServer::Client::Client(int fd, size_t max_req_size) : _fd{fd}, 
_max_req_size{max_req_size} {}
 IPCSocketServer::Client::~Client()
 {
   this->close();
@@ -395,11 +399,11 @@ IPCSocketServer::Client::read(swoc::MemSpan<char> span) 
const
 }
 
 std::tuple<bool, std::string>
-IPCSocketServer::Client::read_all(swoc::FixedBufferWriter &bw) const
+IPCSocketServer::Client::read_all(Buffer &bw) const
 {
   std::string buff;
-  while (bw.remaining() > 0) {
-    auto ret = read({bw.aux_data(), bw.remaining()});
+  while (true) {
+    auto ret = read({bw.writable_data(), bw.available()});
     if (ret < 0) {
       if (check_for_transient_errors()) {
         continue;
@@ -409,20 +413,20 @@ IPCSocketServer::Client::read_all(swoc::FixedBufferWriter 
&bw) const
     }
 
     if (ret == 0) {
-      if (bw.size()) {
-        return {false, swoc::bwprint(buff, "Peer disconnected after reading {} 
bytes.", bw.size())};
+      if (bw.stored()) {
+        return {false, swoc::bwprint(buff, "Peer disconnected after reading {} 
bytes.", bw.stored())};
       }
       return {false, swoc::bwprint(buff, "Peer disconnected. EOF")};
     }
-    bw.commit(ret);
-    if (bw.remaining() > 0) {
+    bw.save(ret);
+    if (_max_req_size - bw.stored() > 0) { // we can still read more.
       using namespace std::chrono_literals;
       if (!this->poll_for_data(1ms)) {
         return {true, buff};
       }
       continue;
     } else {
-      swoc::bwprint(buff, "Buffer is full, we hit the limit: {}", 
bw.capacity());
+      swoc::bwprint(buff, "Buffer is full, we hit the limit: {}", 
_max_req_size);
       break;
     }
   }
@@ -496,6 +500,9 @@ template <> struct 
convert<rpc::comm::IPCSocketServer::Config> {
     if (auto n = node[config::RESTRICTED_API]) {
       rhs.restrictedAccessApi = n.as<bool>();
     }
+    if (auto n = node[config::MAX_BUFFER_SIZE]) {
+      rhs.incomingRequestMaxBufferSize = n.as<size_t>();
+    }
     return true;
   }
 };
diff --git a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc 
b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
index 006e121534..d560b80591 100644
--- a/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
+++ b/src/mgmt/rpc/server/unit_tests/test_rpcserver.cc
@@ -72,6 +72,7 @@ const std::string sockPath{"tests/var/jsonrpc20_test.sock"};
 const std::string lockPath{"tests/var/jsonrpc20_test.lock"};
 constexpr int     default_backlog{5};
 constexpr int     default_maxRetriesOnTransientErrors{64};
+constexpr size_t  default_incoming_req_max_size{32000 * 3};
 DbgCtl            dbg_ctl{"rpc.test.client"};
 
 } // end anonymous namespace
@@ -85,7 +86,7 @@ struct RPCServerTestListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const & /* testRunInfo ATS_UNUSED */) 
override
   {
     Layout::create();
-    init_diags("rpc|rpc.test", nullptr);
+    init_diags("rpc", nullptr);
     RecProcessInit();
 
     signal(SIGPIPE, SIG_IGN);
@@ -100,7 +101,7 @@ struct RPCServerTestListener : Catch::TestEventListenerBase 
{
     rpc::config::RPCConfig serverConfig;
 
     auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" 
+ lockPath + R"(", "sock_path_name": ")" + sockPath +
-                 R"(",  "backlog": 5,"max_retry_on_transient_errors": 64 
}}})"};
+                 R"(",  "backlog": 5,"max_retry_on_transient_errors": 64, 
"incoming_request_max_size": 32000 }}})"};
     YAML::Node configNode = YAML::Load(confStr);
     serverConfig.load(configNode["rpc"]);
     try {
@@ -116,21 +117,36 @@ struct RPCServerTestListener : 
Catch::TestEventListenerBase {
   void
   testRunEnded(Catch::TestRunStats const & /* testRunStats ATS_UNUSED */) 
override
   {
-    // jsonrpcServer->stop_thread();
-    // delete main_thread;
     if (jsonrpcServer) {
-      delete jsonrpcServer;
+      delete jsonrpcServer; // will stop the thread
     }
   }
 
 private:
-  // std::unique_ptr<rpc::RPCServer> jrpcServer;
   std::unique_ptr<EThread> main_thread;
 };
 CATCH_REGISTER_LISTENER(RPCServerTestListener)
 
 RPCServerTestListener::~RPCServerTestListener() {}
 
+void
+restart_json_rpc_server(YAML::Node n)
+{
+  rpc::config::RPCConfig serverConfig;
+  serverConfig.load(n["rpc"]);
+
+  if (jsonrpcServer) {
+    delete jsonrpcServer;
+  }
+
+  try {
+    jsonrpcServer = new rpc::RPCServer(serverConfig);
+    jsonrpcServer->start_thread();
+  } catch (std::exception const &ex) {
+    Dbg(dbg_ctl, "Oops: %s", ex.what());
+  }
+}
+
 DEFINE_JSONRPC_PROTO_FUNCTION(some_foo) // id, params
 {
   swoc::Rv<YAML::Node> resp;
@@ -325,12 +341,12 @@ TEST_CASE("Basic message sending to a running server", 
"[socket]")
 
 TEST_CASE("Sending a message bigger than the internal server's buffer. 32000", 
"[buffer][error]")
 {
-  REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
+  REQUIRE(rpc::add_method_handler("do_nothing32000", &do_nothing));
+  const int S{32000}; // + the rest of the json message.
+  auto json{R"({"jsonrpc": "2.0", "method": "do_nothing32000", "params": 
{"msg":")" + random_string(S) + R"("}, "id":"32k_1"})"};
 
   SECTION("Message larger than the the accepted size.")
   {
-    const int S{32000}; // + the rest of the json message.
-    auto      json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": 
{"msg":")" + random_string(S) + R"("}, "id":"EfGh-1"})"};
     REQUIRE_NOTHROW([&]() {
       ScopedLocalSocket rpc_client;
       auto              resp = rpc_client.query(json);
@@ -338,7 +354,19 @@ TEST_CASE("Sending a message bigger than the internal 
server's buffer. 32000", "
     }());
   }
 
-  REQUIRE(rpc::test_remove_handler("do_nothing"));
+  SECTION("Retry the big message after reconfigure(restart rpc server) the 
incoming request size limit.")
+  {
+    auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" 
+ lockPath + R"(", "sock_path_name": ")" + sockPath +
+                 R"(",  "backlog": 5,"max_retry_on_transient_errors": 64, 
"incoming_request_max_size": 62000 }}})"};
+    YAML::Node n = YAML::Load(confStr);
+    restart_json_rpc_server(n);
+    REQUIRE_NOTHROW([&]() {
+      ScopedLocalSocket rpc_client;
+      auto              resp = rpc_client.query(json);
+      REQUIRE(resp == R"({"jsonrpc": "2.0", "result": {"size": "32000"}, "id": 
"32k_1"})");
+    }());
+  }
+  REQUIRE(rpc::test_remove_handler("do_nothing32000"));
 }
 
 TEST_CASE("Test with invalid json message", "[socket]")
@@ -515,6 +543,7 @@ TEST_CASE("Test configuration parsing from a YAML node. UDS 
values", "[string]")
   REQUIRE(socket->get_conf().maxRetriesOnTransientErrors == 
default_maxRetriesOnTransientErrors);
   REQUIRE(socket->get_conf().sockPathName == sockPath);
   REQUIRE(socket->get_conf().lockPathName == lockPath);
+  REQUIRE(socket->get_conf().incomingRequestMaxBufferSize == 
default_incoming_req_max_size);
 }
 
 TEST_CASE("Test configuration parsing from a file. UDS Server", "[file]")
diff --git a/src/shared/rpc/IPCSocketClient.cc 
b/src/shared/rpc/IPCSocketClient.cc
index 2c8ba9b3e2..ddb09010e4 100644
--- a/src/shared/rpc/IPCSocketClient.cc
+++ b/src/shared/rpc/IPCSocketClient.cc
@@ -21,89 +21,21 @@
 
 #include <stdexcept>
 #include <chrono>
-#include <sstream>
 #include <utility>
 #include <thread>
 
 #include "tsutil/ts_bw_format.h"
 
 #include "shared/rpc/IPCSocketClient.h"
+#include "shared/rpc/MessageStorage.h"
 #include <tscore/ink_assert.h>
 #include <tscore/ink_sock.h>
 
-namespace
-{
-/// @brief Simple buffer to store the jsonrpc server's response.
-///
-///        With small content it will just use the LocalBufferWritter, if the
-///        content gets bigger, then it will just save the buffer into a stream
-///        and reuse the already created BufferWritter.
-template <size_t N> class BufferStream
-{
-  std::ostringstream         _os;
-  swoc::LocalBufferWriter<N> _bw;
-  size_t                     _written{0};
-
-public:
-  char *
-  writable_data()
-  {
-    return _bw.aux_data();
-  }
-
-  void
-  save(size_t n)
-  {
-    _bw.commit(n);
-
-    if (_bw.remaining() == 0) { // no more space available, flush what's on 
the bw
-                                // and reset it.
-      flush();
-    }
-  }
-
-  size_t
-  available() const
-  {
-    return _bw.remaining();
-  }
-
-  void
-  flush()
-  {
-    if (_bw.size() == 0) {
-      return;
-    }
-    _os.write(_bw.view().data(), _bw.size());
-    _written += _bw.size();
-
-    _bw.clear();
-  }
-
-  std::string
-  str()
-  {
-    if (stored() <= _bw.size()) {
-      return {_bw.data(), _bw.size()};
-    }
-
-    flush();
-    return _os.str();
-  }
-
-  size_t
-  stored() const
-  {
-    return _written ? _written : _bw.size();
-  }
-};
-} // namespace
-
 namespace shared::rpc
 {
 
 IPCSocketClient::self_reference
-IPCSocketClient::connect(std::chrono::milliseconds ms, int attempts)
+IPCSocketClient::connect(std::chrono::milliseconds wait_ms, int attempts)
 {
   std::string text;
   int         err, tries{attempts};
@@ -135,7 +67,7 @@ IPCSocketClient::connect(std::chrono::milliseconds ms, int 
attempts)
     if (errno == EAGAIN || errno == EINPROGRESS) {
       // Connection cannot be completed immediately
       // EAGAIN for UDS should suffice, but just in case.
-      std::this_thread::sleep_for(ms);
+      std::this_thread::sleep_for(wait_ms);
       err = errno;
       continue;
     } else {
@@ -185,35 +117,52 @@ IPCSocketClient ::send(std::string_view data)
 }
 
 IPCSocketClient::ReadStatus
-IPCSocketClient::read_all(std::string &content)
+IPCSocketClient::read_all(std::string &content, std::chrono::milliseconds 
timeout_ms, int attempts)
 {
   if (this->is_closed()) {
     // we had a failure.
-    return {};
+    return ReadStatus::UNKNOWN;
   }
 
-  BufferStream<356000> bs;
-
-  ReadStatus readStatus{ReadStatus::UNKNOWN};
-  while (true) {
+  MessageStorage<356000> bs;
+  int                    attempts_left{attempts};
+  ReadStatus             readStatus{ReadStatus::NO_ERROR};
+  // Try to read all the data from the socket. If a timeout happens we retry
+  // 'attemps' times. On error we just stop.
+  while (attempts_left > 0 || readStatus == ReadStatus::NO_ERROR) {
     auto       buf     = bs.writable_data();
-    const auto to_read = bs.available();
-    ssize_t    ret{-1};
-    do {
-      ret = ::read(_sock, buf, to_read);
-    } while (ret < 0 && (errno == EAGAIN || errno == EINTR));
-
-    if (ret > 0) {
-      bs.save(ret);
+    const auto to_read = bs.available(); // Available in the current memory 
chunk.
+    ssize_t    nread{-1};
+
+    // Try again if timed out.
+    if (auto const r = read_ready(_sock, timeout_ms.count()); r == 0) {
+      readStatus = ReadStatus::TIMEOUT;
+      --attempts_left;
       continue;
-    } else {
-      if (bs.stored() > 0) {
-        readStatus = ReadStatus::NO_ERROR;
-        break;
+    } else if (r < 0) {
+      // No more tries.
+      readStatus = ReadStatus::READ_ERROR;
+      break;
+    }
+
+    nread = ::read(_sock, buf, to_read);
+    if (nread > 0) {
+      bs.save(nread);
+      continue;
+    } else if (nread == -1) {
+      if (errno == EAGAIN || errno == EINTR) {
+        continue;
       }
-      readStatus = ReadStatus::STREAM_ERROR;
+      readStatus = ReadStatus::READ_ERROR;
+      break;
+    }
+    // EOF
+    if (bs.stored() > 0) {
+      readStatus = ReadStatus::NO_ERROR;
       break;
     }
+    readStatus = ReadStatus::READ_ERROR;
+    break;
   }
   content = bs.str();
   return readStatus;

Reply via email to