Copilot commented on code in PR #7607:
URL: https://github.com/apache/ignite-3/pull/7607#discussion_r2883086125


##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message.h"
+#include "message_listener.h"
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct configuration {
+    asio::ip::port_type m_in_port;
+    std::string m_out_host_and_port;
+    message_listener* m_listener;
+
+    configuration(asio::ip::port_type m_in_port, const std::string 
&m_out_host_and_port, message_listener *m_listener)
+        : m_in_port(m_in_port)
+        , m_out_host_and_port(m_out_host_and_port)
+        , m_listener(m_listener) { }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& 
stopped, message_listener* listener)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock))
+        , m_stopped(stopped)
+        , m_listener(listener)
+    { }
+
+    ~session() {
+        std::cout << "Session destructed " << this <<  std::endl;
+    }

Review Comment:
   Unconditional `std::cout` output in test utilities tends to pollute test 
logs and makes failures harder to read. Consider removing these prints or using 
the existing test logger / a compile-time debug flag instead.



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message.h"
+#include "message_listener.h"
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct configuration {
+    asio::ip::port_type m_in_port;
+    std::string m_out_host_and_port;
+    message_listener* m_listener;
+
+    configuration(asio::ip::port_type m_in_port, const std::string 
&m_out_host_and_port, message_listener *m_listener)
+        : m_in_port(m_in_port)
+        , m_out_host_and_port(m_out_host_and_port)
+        , m_listener(m_listener) { }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& 
stopped, message_listener* listener)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock))
+        , m_stopped(stopped)
+        , m_listener(listener)
+    { }
+
+    ~session() {
+        std::cout << "Session destructed " << this <<  std::endl;
+    }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        if (m_stopped.load())
+            return;
+
+        tcp::socket &src = direction == forward ? m_in_sock : m_out_sock;
+
+        auto self(shared_from_this());
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [direction, self](const asio::error_code& ec, size_t len) {
+            if (ec) {
+                if (ec == asio::error::eof) {
+                    return;
+                }
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            std::queue<message> &queue = direction == forward ? 
self->m_in_to_out : self->m_out_to_in;
+            bool &writable = direction == forward ? self->m_in_to_out_writable 
: self->m_out_to_in_writable;
+
+            // we have one-threaded executor no synchronization is needed
+            message& msg = queue.emplace(self->buf, len);
+
+            if (self->m_listener) {
+                if (direction == forward) {
+                    self->m_listener->register_out_message(msg);
+                } else {
+                    self->m_listener->register_in_message(msg);
+                }
+            }
+
+            if (writable) { // there are pending write operation on this socket
+                self->do_write(direction);
+            }
+
+            self->do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock;
+        std::queue<message> &queue = direction == forward ? m_in_to_out : 
m_out_to_in;
+        bool &writable = direction == forward ? m_in_to_out_writable : 
m_out_to_in_writable;
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        auto self(shared_from_this());
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size),
+                [direction, self](asio::error_code ec, size_t) {
+                    if (ec) {
+                        if (ec == asio::error::eof) {
+                            return;
+                        }
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    std::queue<message> &queue = direction == forward ? 
self->m_in_to_out : self->m_out_to_in;
+                    bool &writable = direction == forward ? 
self->m_in_to_out_writable : self->m_out_to_in_writable;
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        self->do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });
+        }
+    }
+
+    tcp::socket m_in_sock;
+    tcp::socket m_out_sock;
+
+    bool m_in_to_out_writable{false};
+    bool m_out_to_in_writable{false};
+
+    std::queue<message> m_in_to_out;
+    std::queue<message> m_out_to_in;
+
+    static constexpr size_t BUFF_SIZE = 4096;
+
+    char buf[BUFF_SIZE]{};
+
+    std::atomic_bool& m_stopped;
+
+    message_listener* m_listener{nullptr};
+};
+
+class asio_proxy {
+public:
+    asio_proxy(std::vector<configuration> configurations)
+        : m_resolver(m_io_context)
+        , m_in_sock(m_io_context)
+    {
+        for (auto &cfg : configurations) {
+            m_conn_map.emplace(
+                cfg.m_in_port,
+                proxy_entry{m_io_context, cfg.m_in_port, 
cfg.m_out_host_and_port, cfg.m_listener}
+            );
+        }
+
+        do_serve();
+
+        m_executor = std::make_unique<std::thread>([this]() {
+            m_io_context.run();
+        });
+    }
+
+    ~asio_proxy() {
+        m_stopped.store(true);
+        m_io_context.stop();
+
+        m_executor->join();
+    }
+
+private:
+    struct proxy_entry {
+        tcp::acceptor m_in_acceptor;
+        std::string m_out_host;
+        std::string m_out_port;
+        message_listener* m_listener;
+
+        proxy_entry(asio::io_context& io_context, asio::ip::port_type in_port, 
const std::string& out_host_and_port, message_listener* listener)
+            : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), in_port))
+            , m_listener(listener)
+        {
+            auto colon_pos = out_host_and_port.find(':');
+
+            if (colon_pos == std::string::npos) {
+                throw std::runtime_error("Incorrect host and part format. 
Expected 'hostname:port' but got " + out_host_and_port);
+            }
+
+            m_out_host = out_host_and_port.substr(0, colon_pos);
+            m_out_port = out_host_and_port.substr(colon_pos + 1);
+        }
+    };
+
+
+    void do_serve() {
+        for (auto& [_, entry]: m_conn_map) {
+            do_accept(entry);
+        }
+    }
+
+    void do_accept(proxy_entry& entry) {
+        if (m_stopped.load()) {
+            return;
+        }
+
+        entry.m_in_acceptor.async_accept(m_in_sock, [this, 
&entry](asio::error_code ec) {
+            if (ec) {
+                throw std::runtime_error("Error accepting incoming connection 
" + ec.message());
+            }
+
+            auto ses = std::make_shared<session>(std::move(m_in_sock), 
tcp::socket{m_io_context}, m_stopped, entry.m_listener);
+
+            m_resolver.async_resolve(entry.m_out_host, entry.m_out_port,
+                [ses](asio::error_code ec, tcp::resolver::results_type 
endpoints) { // NOLINT(*-unnecessary-value-param)
+                    if (ec) {
+                        throw std::runtime_error("Error resolving server's 
address " + ec.message());
+                    }
+
+                    asio::async_connect(
+                        ses->get_out_sock(), endpoints, [ses](const 
asio::error_code &ec, const tcp::endpoint &e) {
+                            if (ec) {
+                                std::cout << e.port();
+                                throw std::runtime_error("Error connecting to 
server " + ec.message());

Review Comment:
   This `std::cout << e.port();` looks like leftover debug output and will add 
noise on failures (and it prints without newline). Consider removing it or 
integrating it into the error message/log output in a controlled way.
   ```suggestion
                                   throw std::runtime_error(
                                       "Error connecting to server on port " + 
std::to_string(e.port()) + ": " + ec.message());
   ```



##########
modules/platforms/cpp/cmake/dependencies.cmake:
##########
@@ -43,6 +43,24 @@ function(fetch_dependency NAME URL MD5)
     endif()
 endfunction()
 
+function(add_asio_dependency)
+    message(STATUS "Download dependency: asio")
+
+    FetchContent_Declare(
+            asio
+            URL 
https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
+            URL_HASH MD5=6699ac1dea111c20d024f25e06e573db
+    )
+
+    FetchContent_Populate(asio)
+
+    add_library(asio INTERFACE)
+
+    target_include_directories(asio INTERFACE ${asio_SOURCE_DIR}/asio/include)
+
+    target_compile_definitions(asio INTERFACE ASIO_STANDALONE)

Review Comment:
   `add_asio_dependency()` uses `FetchContent_Populate(asio)` unconditionally 
and always creates the `asio` target. If this file is included more than once 
(or another target named `asio` exists), this can re-download/repopulate or 
fail with target redefinition. Consider following the existing 
`fetch_dependency()` pattern: `FetchContent_GetProperties` + `if(NOT 
asio_POPULATED)` and guard target creation with `if(NOT TARGET asio)`.
   ```suggestion
       FetchContent_GetProperties(asio)
       if (NOT asio_POPULATED)
           FetchContent_Populate(asio)
       endif()
   
       if (NOT TARGET asio)
           add_library(asio INTERFACE)
   
           target_include_directories(asio INTERFACE 
${asio_SOURCE_DIR}/asio/include)
   
           target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
       endif()
   ```



##########
modules/platforms/cpp/cmake/dependencies.cmake:
##########
@@ -43,6 +43,24 @@ function(fetch_dependency NAME URL MD5)
     endif()
 endfunction()
 
+function(add_asio_dependency)
+    message(STATUS "Download dependency: asio")
+
+    FetchContent_Declare(
+            asio
+            URL 
https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
+            URL_HASH MD5=6699ac1dea111c20d024f25e06e573db

Review Comment:
   The `add_asio_dependency` function verifies the downloaded `asio` tarball 
using an MD5 hash (`URL_HASH MD5=...`), which relies on a cryptographically 
broken algorithm. An attacker capable of influencing the download (e.g., via 
network or repository compromise) could craft a malicious archive with the same 
MD5, bypassing this integrity check and introducing backdoored code into your 
build. Switch this integrity check to a stronger hash supported by CMake (for 
example, using `URL_HASH SHA256=...`) and regenerate the checksum from a 
trusted source.
   ```suggestion
               # NOTE: SHA256 checksum must be computed from a trusted copy of 
asio-1-36-0.tar.gz.
               URL_HASH 
SHA256=0f1a5b0a7d8c9e3f2b4c6d8e0f1a2b3c4d5e6f708192a3b4c5d6e7f8091a2b3c
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message.h"
+#include "message_listener.h"
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct configuration {
+    asio::ip::port_type m_in_port;
+    std::string m_out_host_and_port;
+    message_listener* m_listener;
+
+    configuration(asio::ip::port_type m_in_port, const std::string 
&m_out_host_and_port, message_listener *m_listener)
+        : m_in_port(m_in_port)
+        , m_out_host_and_port(m_out_host_and_port)
+        , m_listener(m_listener) { }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& 
stopped, message_listener* listener)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock))
+        , m_stopped(stopped)
+        , m_listener(listener)
+    { }
+
+    ~session() {
+        std::cout << "Session destructed " << this <<  std::endl;
+    }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        if (m_stopped.load())
+            return;
+
+        tcp::socket &src = direction == forward ? m_in_sock : m_out_sock;
+
+        auto self(shared_from_this());
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [direction, self](const asio::error_code& ec, size_t len) {
+            if (ec) {
+                if (ec == asio::error::eof) {
+                    return;
+                }
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            std::queue<message> &queue = direction == forward ? 
self->m_in_to_out : self->m_out_to_in;
+            bool &writable = direction == forward ? self->m_in_to_out_writable 
: self->m_out_to_in_writable;
+
+            // we have one-threaded executor no synchronization is needed
+            message& msg = queue.emplace(self->buf, len);
+
+            if (self->m_listener) {
+                if (direction == forward) {
+                    self->m_listener->register_out_message(msg);
+                } else {
+                    self->m_listener->register_in_message(msg);
+                }
+            }
+
+            if (writable) { // there are pending write operation on this socket

Review Comment:
   This comment contradicts the current logic: `writable` is set to `false` 
when a write is in-flight and reset to `true` when the queue becomes empty, so 
`if (writable)` means “no write in progress; safe to start one”, not “there are 
pending write operation”. Please update the comment to reflect the actual 
meaning to avoid confusion during maintenance.
   ```suggestion
               if (writable) { // no write in progress on this socket; safe to 
start one
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message.h"
+#include "message_listener.h"
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct configuration {
+    asio::ip::port_type m_in_port;
+    std::string m_out_host_and_port;
+    message_listener* m_listener;
+
+    configuration(asio::ip::port_type m_in_port, const std::string 
&m_out_host_and_port, message_listener *m_listener)
+        : m_in_port(m_in_port)
+        , m_out_host_and_port(m_out_host_and_port)
+        , m_listener(m_listener) { }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& 
stopped, message_listener* listener)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock))
+        , m_stopped(stopped)
+        , m_listener(listener)
+    { }
+
+    ~session() {
+        std::cout << "Session destructed " << this <<  std::endl;
+    }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        if (m_stopped.load())
+            return;
+
+        tcp::socket &src = direction == forward ? m_in_sock : m_out_sock;
+
+        auto self(shared_from_this());
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [direction, self](const asio::error_code& ec, size_t len) {
+            if (ec) {
+                if (ec == asio::error::eof) {
+                    return;
+                }
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            std::queue<message> &queue = direction == forward ? 
self->m_in_to_out : self->m_out_to_in;
+            bool &writable = direction == forward ? self->m_in_to_out_writable 
: self->m_out_to_in_writable;
+
+            // we have one-threaded executor no synchronization is needed
+            message& msg = queue.emplace(self->buf, len);
+
+            if (self->m_listener) {
+                if (direction == forward) {
+                    self->m_listener->register_out_message(msg);
+                } else {
+                    self->m_listener->register_in_message(msg);
+                }
+            }
+
+            if (writable) { // there are pending write operation on this socket
+                self->do_write(direction);
+            }
+
+            self->do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock;
+        std::queue<message> &queue = direction == forward ? m_in_to_out : 
m_out_to_in;
+        bool &writable = direction == forward ? m_in_to_out_writable : 
m_out_to_in_writable;
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        auto self(shared_from_this());
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size),
+                [direction, self](asio::error_code ec, size_t) {
+                    if (ec) {
+                        if (ec == asio::error::eof) {
+                            return;
+                        }
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());

Review Comment:
   Same issue here: throwing from `async_write` completion handlers is unsafe 
and can terminate the process. Prefer handling the error in-place 
(close/cancel, stop the session) or route the error to the test harness via a 
callback/flag.
   ```suggestion
                           std::cerr << "Error while writing to socket: " << 
ec.message() << std::endl;
                           return;
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h:
##########
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message.h"
+#include "message_listener.h"
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+struct configuration {
+    asio::ip::port_type m_in_port;
+    std::string m_out_host_and_port;
+    message_listener* m_listener;
+
+    configuration(asio::ip::port_type m_in_port, const std::string 
&m_out_host_and_port, message_listener *m_listener)
+        : m_in_port(m_in_port)
+        , m_out_host_and_port(m_out_host_and_port)
+        , m_listener(m_listener) { }
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(tcp::socket in_sock, tcp::socket out_sock, std::atomic_bool& 
stopped, message_listener* listener)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock))
+        , m_stopped(stopped)
+        , m_listener(listener)
+    { }
+
+    ~session() {
+        std::cout << "Session destructed " << this <<  std::endl;
+    }
+
+    void start() { do_serve(); }
+
+    tcp::socket &get_out_sock() { return m_out_sock; }
+
+    void set_writable(bool writable) {
+        m_in_to_out_writable = writable;
+        m_out_to_in_writable = writable;
+    }
+
+    enum direction { forward, reverse };
+
+private:
+    void do_serve() {
+        do_read(forward);
+        do_read(reverse);
+    }
+
+    void do_read(direction direction) {
+        if (m_stopped.load())
+            return;
+
+        tcp::socket &src = direction == forward ? m_in_sock : m_out_sock;
+
+        auto self(shared_from_this());
+
+        src.async_read_some(asio::buffer(buf, BUFF_SIZE),
+    [direction, self](const asio::error_code& ec, size_t len) {
+            if (ec) {
+                if (ec == asio::error::eof) {
+                    return;
+                }
+                throw std::runtime_error("Error while reading from socket " + 
ec.message());
+            }
+
+            std::queue<message> &queue = direction == forward ? 
self->m_in_to_out : self->m_out_to_in;
+            bool &writable = direction == forward ? self->m_in_to_out_writable 
: self->m_out_to_in_writable;
+
+            // we have one-threaded executor no synchronization is needed
+            message& msg = queue.emplace(self->buf, len);
+
+            if (self->m_listener) {
+                if (direction == forward) {
+                    self->m_listener->register_out_message(msg);
+                } else {
+                    self->m_listener->register_in_message(msg);
+                }
+            }
+
+            if (writable) { // there are pending write operation on this socket
+                self->do_write(direction);
+            }
+
+            self->do_read(direction);
+        });
+    }
+
+    void do_write(direction direction) {
+        tcp::socket &dst = direction == forward ? m_out_sock : m_in_sock;
+        std::queue<message> &queue = direction == forward ? m_in_to_out : 
m_out_to_in;
+        bool &writable = direction == forward ? m_in_to_out_writable : 
m_out_to_in_writable;
+
+        writable = false; // protects from writing same buffer twice (from 
head of queue).
+
+        auto self(shared_from_this());
+        if (!queue.empty()) {
+            message &msg = queue.front();
+
+            asio::async_write(
+                dst, asio::buffer(msg.m_arr, msg.m_size),
+                [direction, self](asio::error_code ec, size_t) {
+                    if (ec) {
+                        if (ec == asio::error::eof) {
+                            return;
+                        }
+                        throw std::runtime_error("Error while writing to 
socket " + ec.message());
+                    }
+
+                    std::queue<message> &queue = direction == forward ? 
self->m_in_to_out : self->m_out_to_in;
+                    bool &writable = direction == forward ? 
self->m_in_to_out_writable : self->m_out_to_in_writable;
+
+                    queue.pop();
+
+                    if (!queue.empty()) {
+                        // makes writes on the same socket strictly ordered
+                        self->do_write(direction);
+                    } else {
+                        writable = true; // now read operation can initiate 
writes
+                    }
+                });
+        }
+    }
+
+    tcp::socket m_in_sock;
+    tcp::socket m_out_sock;
+
+    bool m_in_to_out_writable{false};
+    bool m_out_to_in_writable{false};
+
+    std::queue<message> m_in_to_out;
+    std::queue<message> m_out_to_in;
+
+    static constexpr size_t BUFF_SIZE = 4096;
+
+    char buf[BUFF_SIZE]{};
+
+    std::atomic_bool& m_stopped;
+
+    message_listener* m_listener{nullptr};
+};
+
+class asio_proxy {
+public:
+    asio_proxy(std::vector<configuration> configurations)
+        : m_resolver(m_io_context)
+        , m_in_sock(m_io_context)
+    {
+        for (auto &cfg : configurations) {
+            m_conn_map.emplace(
+                cfg.m_in_port,
+                proxy_entry{m_io_context, cfg.m_in_port, 
cfg.m_out_host_and_port, cfg.m_listener}
+            );
+        }
+
+        do_serve();
+
+        m_executor = std::make_unique<std::thread>([this]() {
+            m_io_context.run();
+        });
+    }
+
+    ~asio_proxy() {
+        m_stopped.store(true);
+        m_io_context.stop();
+
+        m_executor->join();
+    }
+
+private:
+    struct proxy_entry {
+        tcp::acceptor m_in_acceptor;
+        std::string m_out_host;
+        std::string m_out_port;
+        message_listener* m_listener;
+
+        proxy_entry(asio::io_context& io_context, asio::ip::port_type in_port, 
const std::string& out_host_and_port, message_listener* listener)
+            : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), in_port))
+            , m_listener(listener)
+        {
+            auto colon_pos = out_host_and_port.find(':');
+
+            if (colon_pos == std::string::npos) {
+                throw std::runtime_error("Incorrect host and part format. 
Expected 'hostname:port' but got " + out_host_and_port);
+            }
+
+            m_out_host = out_host_and_port.substr(0, colon_pos);
+            m_out_port = out_host_and_port.substr(colon_pos + 1);
+        }
+    };
+
+
+    void do_serve() {
+        for (auto& [_, entry]: m_conn_map) {
+            do_accept(entry);
+        }
+    }
+
+    void do_accept(proxy_entry& entry) {
+        if (m_stopped.load()) {
+            return;
+        }
+
+        entry.m_in_acceptor.async_accept(m_in_sock, [this, 
&entry](asio::error_code ec) {
+            if (ec) {
+                throw std::runtime_error("Error accepting incoming connection 
" + ec.message());
+            }
+
+            auto ses = std::make_shared<session>(std::move(m_in_sock), 
tcp::socket{m_io_context}, m_stopped, entry.m_listener);
+
+            m_resolver.async_resolve(entry.m_out_host, entry.m_out_port,
+                [ses](asio::error_code ec, tcp::resolver::results_type 
endpoints) { // NOLINT(*-unnecessary-value-param)
+                    if (ec) {
+                        throw std::runtime_error("Error resolving server's 
address " + ec.message());
+                    }
+
+                    asio::async_connect(
+                        ses->get_out_sock(), endpoints, [ses](const 
asio::error_code &ec, const tcp::endpoint &e) {
+                            if (ec) {
+                                std::cout << e.port();
+                                throw std::runtime_error("Error connecting to 
server " + ec.message());
+                            }
+
+                            ses->set_writable(true);
+                            ses->start();
+                        });
+                });
+
+            do_accept(entry);
+        });

Review Comment:
   `asio_proxy` starts an `async_accept` for each `proxy_entry` but all accepts 
share the same member socket `m_in_sock`. Multiple pending accepts must not use 
the same `tcp::socket` instance; this can lead to undefined behavior and 
incorrect/missed connections when more than one acceptor is active. Create a 
fresh `tcp::socket` per accept operation (e.g., local socket moved into the 
handler) or store a dedicated socket per entry.
   ```suggestion
           tcp::socket in_sock{m_io_context};
   
           entry.m_in_acceptor.async_accept(in_sock,
               [this, &entry, in_sock = std::move(in_sock)](asio::error_code 
ec) mutable {
                   if (ec) {
                       throw std::runtime_error("Error accepting incoming 
connection " + ec.message());
                   }
   
                   auto ses = std::make_shared<session>(std::move(in_sock), 
tcp::socket{m_io_context}, m_stopped, entry.m_listener);
   
                   m_resolver.async_resolve(entry.m_out_host, entry.m_out_port,
                       [ses](asio::error_code ec, tcp::resolver::results_type 
endpoints) { // NOLINT(*-unnecessary-value-param)
                           if (ec) {
                               throw std::runtime_error("Error resolving 
server's address " + ec.message());
                           }
   
                           asio::async_connect(
                               ses->get_out_sock(), endpoints, [ses](const 
asio::error_code &ec, const tcp::endpoint &e) {
                                   if (ec) {
                                       std::cout << e.port();
                                       throw std::runtime_error("Error 
connecting to server " + ec.message());
                                   }
   
                                   ses->set_writable(true);
                                   ses->start();
                               });
                       });
   
                   do_accept(entry);
               });
   ```



##########
modules/platforms/cpp/tests/fake_server/proxy/message.h:
##########
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+//
+
+#pragma once
+
+#include <cstring>
+#include <utility>
+
+namespace ignite::proxy {
+
+struct message {
+    char *m_arr{nullptr};
+    size_t m_size = 0;
+
+    friend void swap(message& lhs, message& rhs) noexcept {
+        using std::swap;
+        swap(lhs.m_arr, rhs.m_arr);
+        swap(lhs.m_size, rhs.m_size);
+    }
+
+    message(char *arr, size_t size)
+        : m_size(size)
+    {
+        m_arr = new char[m_size];
+        std::memcpy(m_arr, arr, size);
+    }

Review Comment:
   `message` manually manages a heap buffer (`new[]`/`delete[]`). This 
increases maintenance risk in tests (e.g., future changes to copying/moving). 
Prefer using `std::vector<std::byte>`/`std::vector<char>` (or 
`std::unique_ptr<char[]>`) to represent the payload and avoid manual memory 
management. Also, the input pointer should be `const char*` since the 
constructor does not modify the source buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to