Copilot commented on code in PR #7607: URL: https://github.com/apache/ignite-3/pull/7607#discussion_r2883086125
########## modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h: ########## @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include <atomic> +#include <iostream> +#include <map> +#include <memory> +#include <queue> +#include <thread> +#include <vector> + +#include <asio.hpp> +#include <asio/ts/internet.hpp> + +#include "message.h" +#include "message_listener.h" + +namespace ignite::proxy { + +using asio::ip::tcp; + +struct configuration { + asio::ip::port_type m_in_port; + std::string m_out_host_and_port; + message_listener* m_listener; + + configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener) + : m_in_port(m_in_port) + , m_out_host_and_port(m_out_host_and_port) + , m_listener(m_listener) { } +}; + +class session : public std::enable_shared_from_this<session> { +public: + session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener) + : m_in_sock(std::move(in_sock)) + , m_out_sock(std::move(out_sock)) + , m_stopped(stopped) + , m_listener(listener) + { } + + ~session() { + std::cout << "Session destructed " << this << std::endl; + } Review Comment: Unconditional `std::cout` output in test utilities tends to pollute test logs and makes failures harder to read. Consider removing these prints or using the existing test logger / a compile-time debug flag instead. ########## modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h: ########## @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include <atomic> +#include <iostream> +#include <map> +#include <memory> +#include <queue> +#include <thread> +#include <vector> + +#include <asio.hpp> +#include <asio/ts/internet.hpp> + +#include "message.h" +#include "message_listener.h" + +namespace ignite::proxy { + +using asio::ip::tcp; + +struct configuration { + asio::ip::port_type m_in_port; + std::string m_out_host_and_port; + message_listener* m_listener; + + configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener) + : m_in_port(m_in_port) + , m_out_host_and_port(m_out_host_and_port) + , m_listener(m_listener) { } +}; + +class session : public std::enable_shared_from_this<session> { +public: + session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener) + : m_in_sock(std::move(in_sock)) + , m_out_sock(std::move(out_sock)) + , m_stopped(stopped) + , m_listener(listener) + { } + + ~session() { + std::cout << "Session destructed " << this << std::endl; + } + + void start() { do_serve(); } + + tcp::socket &get_out_sock() { return m_out_sock; } + + void set_writable(bool writable) { + m_in_to_out_writable = writable; + m_out_to_in_writable = writable; + } + + enum direction { forward, reverse }; + +private: + void do_serve() { + do_read(forward); + do_read(reverse); + } + + void do_read(direction direction) { + if (m_stopped.load()) + return; + + tcp::socket &src = direction == forward ? m_in_sock : m_out_sock; + + auto self(shared_from_this()); + + src.async_read_some(asio::buffer(buf, BUFF_SIZE), + [direction, self](const asio::error_code& ec, size_t len) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while reading from socket " + ec.message()); + } + + std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + // we have one-threaded executor no synchronization is needed + message& msg = queue.emplace(self->buf, len); + + if (self->m_listener) { + if (direction == forward) { + self->m_listener->register_out_message(msg); + } else { + self->m_listener->register_in_message(msg); + } + } + + if (writable) { // there are pending write operation on this socket + self->do_write(direction); + } + + self->do_read(direction); + }); + } + + void do_write(direction direction) { + tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock; + std::queue<message> &queue = direction == forward ? m_in_to_out : m_out_to_in; + bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable; + + writable = false; // protects from writing same buffer twice (from head of queue). + + auto self(shared_from_this()); + if (!queue.empty()) { + message &msg = queue.front(); + + asio::async_write( + dst, asio::buffer(msg.m_arr, msg.m_size), + [direction, self](asio::error_code ec, size_t) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while writing to socket " + ec.message()); + } + + std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + queue.pop(); + + if (!queue.empty()) { + // makes writes on the same socket strictly ordered + self->do_write(direction); + } else { + writable = true; // now read operation can initiate writes + } + }); + } + } + + tcp::socket m_in_sock; + tcp::socket m_out_sock; + + bool m_in_to_out_writable{false}; + bool m_out_to_in_writable{false}; + + std::queue<message> m_in_to_out; + std::queue<message> m_out_to_in; + + static constexpr size_t BUFF_SIZE = 4096; + + char buf[BUFF_SIZE]{}; + + std::atomic_bool& m_stopped; + + message_listener* m_listener{nullptr}; +}; + +class asio_proxy { +public: + asio_proxy(std::vector<configuration> configurations) + : m_resolver(m_io_context) + , m_in_sock(m_io_context) + { + for (auto &cfg : configurations) { + m_conn_map.emplace( + cfg.m_in_port, + proxy_entry{m_io_context, cfg.m_in_port, cfg.m_out_host_and_port, cfg.m_listener} + ); + } + + do_serve(); + + m_executor = std::make_unique<std::thread>([this]() { + m_io_context.run(); + }); + } + + ~asio_proxy() { + m_stopped.store(true); + m_io_context.stop(); + + m_executor->join(); + } + +private: + struct proxy_entry { + tcp::acceptor m_in_acceptor; + std::string m_out_host; + std::string m_out_port; + message_listener* m_listener; + + proxy_entry(asio::io_context& io_context, asio::ip::port_type in_port, const std::string& out_host_and_port, message_listener* listener) + : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), in_port)) + , m_listener(listener) + { + auto colon_pos = out_host_and_port.find(':'); + + if (colon_pos == std::string::npos) { + throw std::runtime_error("Incorrect host and part format. Expected 'hostname:port' but got " + out_host_and_port); + } + + m_out_host = out_host_and_port.substr(0, colon_pos); + m_out_port = out_host_and_port.substr(colon_pos + 1); + } + }; + + + void do_serve() { + for (auto& [_, entry]: m_conn_map) { + do_accept(entry); + } + } + + void do_accept(proxy_entry& entry) { + if (m_stopped.load()) { + return; + } + + entry.m_in_acceptor.async_accept(m_in_sock, [this, &entry](asio::error_code ec) { + if (ec) { + throw std::runtime_error("Error accepting incoming connection " + ec.message()); + } + + auto ses = std::make_shared<session>(std::move(m_in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener); + + m_resolver.async_resolve(entry.m_out_host, entry.m_out_port, + [ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param) + if (ec) { + throw std::runtime_error("Error resolving server's address " + ec.message()); + } + + asio::async_connect( + ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) { + if (ec) { + std::cout << e.port(); + throw std::runtime_error("Error connecting to server " + ec.message()); Review Comment: This `std::cout << e.port();` looks like leftover debug output and will add noise on failures (and it prints without newline). Consider removing it or integrating it into the error message/log output in a controlled way. ```suggestion throw std::runtime_error( "Error connecting to server on port " + std::to_string(e.port()) + ": " + ec.message()); ``` ########## modules/platforms/cpp/cmake/dependencies.cmake: ########## @@ -43,6 +43,24 @@ function(fetch_dependency NAME URL MD5) endif() endfunction() +function(add_asio_dependency) + message(STATUS "Download dependency: asio") + + FetchContent_Declare( + asio + URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz + URL_HASH MD5=6699ac1dea111c20d024f25e06e573db + ) + + FetchContent_Populate(asio) + + add_library(asio INTERFACE) + + target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include) + + target_compile_definitions(asio INTERFACE ASIO_STANDALONE) Review Comment: `add_asio_dependency()` uses `FetchContent_Populate(asio)` unconditionally and always creates the `asio` target. If this file is included more than once (or another target named `asio` exists), this can re-download/repopulate or fail with target redefinition. Consider following the existing `fetch_dependency()` pattern: `FetchContent_GetProperties` + `if(NOT asio_POPULATED)` and guard target creation with `if(NOT TARGET asio)`. ```suggestion FetchContent_GetProperties(asio) if (NOT asio_POPULATED) FetchContent_Populate(asio) endif() if (NOT TARGET asio) add_library(asio INTERFACE) target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include) target_compile_definitions(asio INTERFACE ASIO_STANDALONE) endif() ``` ########## modules/platforms/cpp/cmake/dependencies.cmake: ########## @@ -43,6 +43,24 @@ function(fetch_dependency NAME URL MD5) endif() endfunction() +function(add_asio_dependency) + message(STATUS "Download dependency: asio") + + FetchContent_Declare( + asio + URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz + URL_HASH MD5=6699ac1dea111c20d024f25e06e573db Review Comment: The `add_asio_dependency` function verifies the downloaded `asio` tarball using an MD5 hash (`URL_HASH MD5=...`), which relies on a cryptographically broken algorithm. An attacker capable of influencing the download (e.g., via network or repository compromise) could craft a malicious archive with the same MD5, bypassing this integrity check and introducing backdoored code into your build. Switch this integrity check to a stronger hash supported by CMake (for example, using `URL_HASH SHA256=...`) and regenerate the checksum from a trusted source. ```suggestion # NOTE: SHA256 checksum must be computed from a trusted copy of asio-1-36-0.tar.gz. URL_HASH SHA256=0f1a5b0a7d8c9e3f2b4c6d8e0f1a2b3c4d5e6f708192a3b4c5d6e7f8091a2b3c ``` ########## modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h: ########## @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include <atomic> +#include <iostream> +#include <map> +#include <memory> +#include <queue> +#include <thread> +#include <vector> + +#include <asio.hpp> +#include <asio/ts/internet.hpp> + +#include "message.h" +#include "message_listener.h" + +namespace ignite::proxy { + +using asio::ip::tcp; + +struct configuration { + asio::ip::port_type m_in_port; + std::string m_out_host_and_port; + message_listener* m_listener; + + configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener) + : m_in_port(m_in_port) + , m_out_host_and_port(m_out_host_and_port) + , m_listener(m_listener) { } +}; + +class session : public std::enable_shared_from_this<session> { +public: + session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener) + : m_in_sock(std::move(in_sock)) + , m_out_sock(std::move(out_sock)) + , m_stopped(stopped) + , m_listener(listener) + { } + + ~session() { + std::cout << "Session destructed " << this << std::endl; + } + + void start() { do_serve(); } + + tcp::socket &get_out_sock() { return m_out_sock; } + + void set_writable(bool writable) { + m_in_to_out_writable = writable; + m_out_to_in_writable = writable; + } + + enum direction { forward, reverse }; + +private: + void do_serve() { + do_read(forward); + do_read(reverse); + } + + void do_read(direction direction) { + if (m_stopped.load()) + return; + + tcp::socket &src = direction == forward ? m_in_sock : m_out_sock; + + auto self(shared_from_this()); + + src.async_read_some(asio::buffer(buf, BUFF_SIZE), + [direction, self](const asio::error_code& ec, size_t len) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while reading from socket " + ec.message()); + } + + std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + // we have one-threaded executor no synchronization is needed + message& msg = queue.emplace(self->buf, len); + + if (self->m_listener) { + if (direction == forward) { + self->m_listener->register_out_message(msg); + } else { + self->m_listener->register_in_message(msg); + } + } + + if (writable) { // there are pending write operation on this socket Review Comment: This comment contradicts the current logic: `writable` is set to `false` when a write is in-flight and reset to `true` when the queue becomes empty, so `if (writable)` means “no write in progress; safe to start one”, not “there are pending write operation”. Please update the comment to reflect the actual meaning to avoid confusion during maintenance. ```suggestion if (writable) { // no write in progress on this socket; safe to start one ``` ########## modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h: ########## @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include <atomic> +#include <iostream> +#include <map> +#include <memory> +#include <queue> +#include <thread> +#include <vector> + +#include <asio.hpp> +#include <asio/ts/internet.hpp> + +#include "message.h" +#include "message_listener.h" + +namespace ignite::proxy { + +using asio::ip::tcp; + +struct configuration { + asio::ip::port_type m_in_port; + std::string m_out_host_and_port; + message_listener* m_listener; + + configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener) + : m_in_port(m_in_port) + , m_out_host_and_port(m_out_host_and_port) + , m_listener(m_listener) { } +}; + +class session : public std::enable_shared_from_this<session> { +public: + session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener) + : m_in_sock(std::move(in_sock)) + , m_out_sock(std::move(out_sock)) + , m_stopped(stopped) + , m_listener(listener) + { } + + ~session() { + std::cout << "Session destructed " << this << std::endl; + } + + void start() { do_serve(); } + + tcp::socket &get_out_sock() { return m_out_sock; } + + void set_writable(bool writable) { + m_in_to_out_writable = writable; + m_out_to_in_writable = writable; + } + + enum direction { forward, reverse }; + +private: + void do_serve() { + do_read(forward); + do_read(reverse); + } + + void do_read(direction direction) { + if (m_stopped.load()) + return; + + tcp::socket &src = direction == forward ? m_in_sock : m_out_sock; + + auto self(shared_from_this()); + + src.async_read_some(asio::buffer(buf, BUFF_SIZE), + [direction, self](const asio::error_code& ec, size_t len) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while reading from socket " + ec.message()); + } + + std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + // we have one-threaded executor no synchronization is needed + message& msg = queue.emplace(self->buf, len); + + if (self->m_listener) { + if (direction == forward) { + self->m_listener->register_out_message(msg); + } else { + self->m_listener->register_in_message(msg); + } + } + + if (writable) { // there are pending write operation on this socket + self->do_write(direction); + } + + self->do_read(direction); + }); + } + + void do_write(direction direction) { + tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock; + std::queue<message> &queue = direction == forward ? m_in_to_out : m_out_to_in; + bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable; + + writable = false; // protects from writing same buffer twice (from head of queue). + + auto self(shared_from_this()); + if (!queue.empty()) { + message &msg = queue.front(); + + asio::async_write( + dst, asio::buffer(msg.m_arr, msg.m_size), + [direction, self](asio::error_code ec, size_t) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while writing to socket " + ec.message()); Review Comment: Same issue here: throwing from `async_write` completion handlers is unsafe and can terminate the process. Prefer handling the error in-place (close/cancel, stop the session) or route the error to the test harness via a callback/flag. ```suggestion std::cerr << "Error while writing to socket: " << ec.message() << std::endl; return; ``` ########## modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h: ########## @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include <atomic> +#include <iostream> +#include <map> +#include <memory> +#include <queue> +#include <thread> +#include <vector> + +#include <asio.hpp> +#include <asio/ts/internet.hpp> + +#include "message.h" +#include "message_listener.h" + +namespace ignite::proxy { + +using asio::ip::tcp; + +struct configuration { + asio::ip::port_type m_in_port; + std::string m_out_host_and_port; + message_listener* m_listener; + + configuration(asio::ip::port_type m_in_port, const std::string &m_out_host_and_port, message_listener *m_listener) + : m_in_port(m_in_port) + , m_out_host_and_port(m_out_host_and_port) + , m_listener(m_listener) { } +}; + +class session : public std::enable_shared_from_this<session> { +public: + session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& stopped, message_listener* listener) + : m_in_sock(std::move(in_sock)) + , m_out_sock(std::move(out_sock)) + , m_stopped(stopped) + , m_listener(listener) + { } + + ~session() { + std::cout << "Session destructed " << this << std::endl; + } + + void start() { do_serve(); } + + tcp::socket &get_out_sock() { return m_out_sock; } + + void set_writable(bool writable) { + m_in_to_out_writable = writable; + m_out_to_in_writable = writable; + } + + enum direction { forward, reverse }; + +private: + void do_serve() { + do_read(forward); + do_read(reverse); + } + + void do_read(direction direction) { + if (m_stopped.load()) + return; + + tcp::socket &src = direction == forward ? m_in_sock : m_out_sock; + + auto self(shared_from_this()); + + src.async_read_some(asio::buffer(buf, BUFF_SIZE), + [direction, self](const asio::error_code& ec, size_t len) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while reading from socket " + ec.message()); + } + + std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + // we have one-threaded executor no synchronization is needed + message& msg = queue.emplace(self->buf, len); + + if (self->m_listener) { + if (direction == forward) { + self->m_listener->register_out_message(msg); + } else { + self->m_listener->register_in_message(msg); + } + } + + if (writable) { // there are pending write operation on this socket + self->do_write(direction); + } + + self->do_read(direction); + }); + } + + void do_write(direction direction) { + tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock; + std::queue<message> &queue = direction == forward ? m_in_to_out : m_out_to_in; + bool &writable = direction == forward ? m_in_to_out_writable : m_out_to_in_writable; + + writable = false; // protects from writing same buffer twice (from head of queue). + + auto self(shared_from_this()); + if (!queue.empty()) { + message &msg = queue.front(); + + asio::async_write( + dst, asio::buffer(msg.m_arr, msg.m_size), + [direction, self](asio::error_code ec, size_t) { + if (ec) { + if (ec == asio::error::eof) { + return; + } + throw std::runtime_error("Error while writing to socket " + ec.message()); + } + + std::queue<message> &queue = direction == forward ? self->m_in_to_out : self->m_out_to_in; + bool &writable = direction == forward ? self->m_in_to_out_writable : self->m_out_to_in_writable; + + queue.pop(); + + if (!queue.empty()) { + // makes writes on the same socket strictly ordered + self->do_write(direction); + } else { + writable = true; // now read operation can initiate writes + } + }); + } + } + + tcp::socket m_in_sock; + tcp::socket m_out_sock; + + bool m_in_to_out_writable{false}; + bool m_out_to_in_writable{false}; + + std::queue<message> m_in_to_out; + std::queue<message> m_out_to_in; + + static constexpr size_t BUFF_SIZE = 4096; + + char buf[BUFF_SIZE]{}; + + std::atomic_bool& m_stopped; + + message_listener* m_listener{nullptr}; +}; + +class asio_proxy { +public: + asio_proxy(std::vector<configuration> configurations) + : m_resolver(m_io_context) + , m_in_sock(m_io_context) + { + for (auto &cfg : configurations) { + m_conn_map.emplace( + cfg.m_in_port, + proxy_entry{m_io_context, cfg.m_in_port, cfg.m_out_host_and_port, cfg.m_listener} + ); + } + + do_serve(); + + m_executor = std::make_unique<std::thread>([this]() { + m_io_context.run(); + }); + } + + ~asio_proxy() { + m_stopped.store(true); + m_io_context.stop(); + + m_executor->join(); + } + +private: + struct proxy_entry { + tcp::acceptor m_in_acceptor; + std::string m_out_host; + std::string m_out_port; + message_listener* m_listener; + + proxy_entry(asio::io_context& io_context, asio::ip::port_type in_port, const std::string& out_host_and_port, message_listener* listener) + : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), in_port)) + , m_listener(listener) + { + auto colon_pos = out_host_and_port.find(':'); + + if (colon_pos == std::string::npos) { + throw std::runtime_error("Incorrect host and part format. Expected 'hostname:port' but got " + out_host_and_port); + } + + m_out_host = out_host_and_port.substr(0, colon_pos); + m_out_port = out_host_and_port.substr(colon_pos + 1); + } + }; + + + void do_serve() { + for (auto& [_, entry]: m_conn_map) { + do_accept(entry); + } + } + + void do_accept(proxy_entry& entry) { + if (m_stopped.load()) { + return; + } + + entry.m_in_acceptor.async_accept(m_in_sock, [this, &entry](asio::error_code ec) { + if (ec) { + throw std::runtime_error("Error accepting incoming connection " + ec.message()); + } + + auto ses = std::make_shared<session>(std::move(m_in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener); + + m_resolver.async_resolve(entry.m_out_host, entry.m_out_port, + [ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param) + if (ec) { + throw std::runtime_error("Error resolving server's address " + ec.message()); + } + + asio::async_connect( + ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) { + if (ec) { + std::cout << e.port(); + throw std::runtime_error("Error connecting to server " + ec.message()); + } + + ses->set_writable(true); + ses->start(); + }); + }); + + do_accept(entry); + }); Review Comment: `asio_proxy` starts an `async_accept` for each `proxy_entry` but all accepts share the same member socket `m_in_sock`. Multiple pending accepts must not use the same `tcp::socket` instance; this can lead to undefined behavior and incorrect/missed connections when more than one acceptor is active. Create a fresh `tcp::socket` per accept operation (e.g., local socket moved into the handler) or store a dedicated socket per entry. ```suggestion tcp::socket in_sock{m_io_context}; entry.m_in_acceptor.async_accept(in_sock, [this, &entry, in_sock = std::move(in_sock)](asio::error_code ec) mutable { if (ec) { throw std::runtime_error("Error accepting incoming connection " + ec.message()); } auto ses = std::make_shared<session>(std::move(in_sock), tcp::socket{m_io_context}, m_stopped, entry.m_listener); m_resolver.async_resolve(entry.m_out_host, entry.m_out_port, [ses](asio::error_code ec, tcp::resolver::results_type endpoints) { // NOLINT(*-unnecessary-value-param) if (ec) { throw std::runtime_error("Error resolving server's address " + ec.message()); } asio::async_connect( ses->get_out_sock(), endpoints, [ses](const asio::error_code &ec, const tcp::endpoint &e) { if (ec) { std::cout << e.port(); throw std::runtime_error("Error connecting to server " + ec.message()); } ses->set_writable(true); ses->start(); }); }); do_accept(entry); }); ``` ########## modules/platforms/cpp/tests/fake_server/proxy/message.h: ########## @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// + +#pragma once + +#include <cstring> +#include <utility> + +namespace ignite::proxy { + +struct message { + char *m_arr{nullptr}; + size_t m_size = 0; + + friend void swap(message& lhs, message& rhs) noexcept { + using std::swap; + swap(lhs.m_arr, rhs.m_arr); + swap(lhs.m_size, rhs.m_size); + } + + message(char *arr, size_t size) + : m_size(size) + { + m_arr = new char[m_size]; + std::memcpy(m_arr, arr, size); + } Review Comment: `message` manually manages a heap buffer (`new[]`/`delete[]`). This increases maintenance risk in tests (e.g., future changes to copying/moving). Prefer using `std::vector<std::byte>`/`std::vector<char>` (or `std::unique_ptr<char[]>`) to represent the payload and avoid manual memory management. Also, the input pointer should be `const char*` since the constructor does not modify the source buffer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
