szaszm commented on code in PR #1595:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280578871


##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -95,40 +151,29 @@ void ControllerSocketProtocol::initialize() {
   configuration_->get(Configuration::controller_socket_host, host);
 
   std::string port;
+  stopListener();
   if (configuration_->get(Configuration::controller_socket_port, port)) {
-    if (nullptr != secure_context) {
-#ifdef OPENSSL_SUPPORT
-      // if there is no openssl support we won't be using SSL
-      auto tls_context = std::make_shared<io::TLSContext>(configuration_, 
secure_context);
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new 
io::TLSServerSocket(tls_context, host, std::stoi(port), 2));
-#else
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new 
io::ServerSocket(nullptr, host, std::stoi(port), 2));
-#endif
-    } else {
-      server_socket_ = std::unique_ptr<io::BaseServerSocket>(new 
io::ServerSocket(nullptr, host, std::stoi(port), 2));
-    }
-    // if we have a localhost hostname and we did not manually specify 
any.interface we will
-    // bind only to the loopback adapter
+    // if we have a localhost hostname and we did not manually specify 
any.interface we will bind only to the loopback adapter
     if ((host == "localhost" || host == "127.0.0.1" || host == "::") && 
!any_interface) {
-      server_socket_->initialize(true);
+      acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, 
asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), std::stoi(port)));
     } else {
-      server_socket_->initialize(false);
+      acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, 
asio::ip::tcp::endpoint(asio::ip::tcp::v4(), std::stoi(port)));
     }
 
-    auto check = [this]() -> bool {
-      return update_sink_.isRunning();
-    };
-
-    auto handler = [this](io::BaseStream *stream) {
-      handleCommand(stream);
-    };
-    server_socket_->registerCallback(check, handler);
+    if (secure_context) {
+      co_spawn(io_context_, startAcceptSsl(std::move(secure_context)), 
asio::detached);
+    } else {
+      co_spawn(io_context_, startAccept(), asio::detached);
+    }
+    server_thread_ = std::thread([this] {
+      io_context_.run();
+    });

Review Comment:
   Can we share the io_context with c2?



##########
libminifi/src/io/InputStream.cpp:
##########
@@ -71,18 +67,20 @@ size_t InputStream::read(std::string &str, bool widen) {
     return length_return;
   }
 
-  std::vector<std::byte> buffer(string_length);
-  const auto read_return = read(buffer);
-  if (read_return != string_length) {
-    return read_return;
+  str.clear();
+  str.reserve(string_length);
+
+  auto bytes_to_read = string_length;
+  while (bytes_to_read > 0) {
+    std::vector<std::byte> buffer(bytes_to_read);
+    const auto read_return = read(buffer);
+    if (io::isError(read_return))
+      return read_return;
+    bytes_to_read -= read_return;
+    str.append(std::string(reinterpret_cast<const char*>(buffer.data()), 
read_return));

Review Comment:
   We could retry a couple of times on read_return=0 before bailing. Not sure 
if it changes anything in practice.



##########
libminifi/src/c2/ControllerSocketProtocol.cpp:
##########
@@ -325,33 +370,33 @@ void 
ControllerSocketProtocol::handleDescribe(io::BaseStream *stream) {
   }
 }
 
-void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) {
+asio::awaitable<void> 
ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream>&& 
stream) {

Review Comment:
   What do we get by making this an equivalent coroutine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to