szaszm commented on code in PR #1595: URL: https://github.com/apache/nifi-minifi-cpp/pull/1595#discussion_r1280578871
########## libminifi/src/c2/ControllerSocketProtocol.cpp: ########## @@ -95,40 +151,29 @@ void ControllerSocketProtocol::initialize() { configuration_->get(Configuration::controller_socket_host, host); std::string port; + stopListener(); if (configuration_->get(Configuration::controller_socket_port, port)) { - if (nullptr != secure_context) { -#ifdef OPENSSL_SUPPORT - // if there is no openssl support we won't be using SSL - auto tls_context = std::make_shared<io::TLSContext>(configuration_, secure_context); - server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::TLSServerSocket(tls_context, host, std::stoi(port), 2)); -#else - server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2)); -#endif - } else { - server_socket_ = std::unique_ptr<io::BaseServerSocket>(new io::ServerSocket(nullptr, host, std::stoi(port), 2)); - } - // if we have a localhost hostname and we did not manually specify any.interface we will - // bind only to the loopback adapter + // if we have a localhost hostname and we did not manually specify any.interface we will bind only to the loopback adapter if ((host == "localhost" || host == "127.0.0.1" || host == "::") && !any_interface) { - server_socket_->initialize(true); + acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), std::stoi(port))); } else { - server_socket_->initialize(false); + acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(io_context_, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), std::stoi(port))); } - auto check = [this]() -> bool { - return update_sink_.isRunning(); - }; - - auto handler = [this](io::BaseStream *stream) { - handleCommand(stream); - }; - server_socket_->registerCallback(check, handler); + if (secure_context) { + co_spawn(io_context_, startAcceptSsl(std::move(secure_context)), asio::detached); + } else { + co_spawn(io_context_, startAccept(), asio::detached); + } + server_thread_ = std::thread([this] { + io_context_.run(); + }); Review Comment: Can we share the io_context with c2? ########## libminifi/src/io/InputStream.cpp: ########## @@ -71,18 +67,20 @@ size_t InputStream::read(std::string &str, bool widen) { return length_return; } - std::vector<std::byte> buffer(string_length); - const auto read_return = read(buffer); - if (read_return != string_length) { - return read_return; + str.clear(); + str.reserve(string_length); + + auto bytes_to_read = string_length; + while (bytes_to_read > 0) { + std::vector<std::byte> buffer(bytes_to_read); + const auto read_return = read(buffer); + if (io::isError(read_return)) + return read_return; + bytes_to_read -= read_return; + str.append(std::string(reinterpret_cast<const char*>(buffer.data()), read_return)); Review Comment: We could retry a couple of times on read_return=0 before bailing. Not sure if it changes anything in practice. ########## libminifi/src/c2/ControllerSocketProtocol.cpp: ########## @@ -325,33 +370,33 @@ void ControllerSocketProtocol::handleDescribe(io::BaseStream *stream) { } } -void ControllerSocketProtocol::handleCommand(io::BaseStream *stream) { +asio::awaitable<void> ControllerSocketProtocol::handleCommand(std::unique_ptr<io::BaseStream>&& stream) { Review Comment: What do we get by making this an equivalent coroutine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org