Copilot commented on code in PR #7966:
URL: https://github.com/apache/ignite-3/pull/7966#discussion_r3060488404


##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,38 @@
+/*
+ *  Copyright (C) GridGain Systems. All Rights Reserved.
+ *  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/

Review Comment:
   The new header uses a proprietary GridGain copyright banner instead of the 
standard ASF Apache 2.0 header used elsewhere in this repository. This is a 
licensing/compliance issue; please replace it with the standard ASF license 
header (matching other C++ headers under modules/platforms/cpp/ignite/...).
   ```suggestion
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
   ```



##########
modules/platforms/cpp/ignite/common/uuid.h:
##########
@@ -44,7 +46,60 @@ class uuid {
      */
     constexpr uuid(std::int64_t most, std::int64_t least) noexcept
         : most(most)
-        , least(least) {}
+        , least(least) { }
+
+    /**
+     * Parses string-encoded uuid.
+     * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
+     * @param text String-encoded uuid.
+     * @return @c uuid object if parsing was successful, otherwise empty 
optional.
+     */
+    static std::optional<uuid> from_string(const std::string& text) {
+        if (text.length() != 36) {
+            return {};
+        }
+
+        auto str = text.c_str();
+
+        auto parse_chunk = [str](size_t beg, size_t end, int64_t& out) -> bool 
{
+            char* p;
+            out = std::strtoll(str + beg, &p, 16);
+
+            if (p != str + end || (*p != '-' && *p != '\0') || errno == 
ERANGE) {
+                return false;
+            }

Review Comment:
   `uuid::from_string` uses `errno`/`ERANGE` but this header does not include 
`<cerrno>`, and `errno` is not reset before calling `strtoll`, so a previous 
error can cause false negatives. Include `<cerrno>` and set `errno = 0` before 
each `strtoll` (and/or avoid relying on `errno` unless it’s explicitly cleared).



##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,38 @@
+/*
+ *  Copyright (C) GridGain Systems. All Rights Reserved.
+ *  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+struct partition_assignment {
+    /** Assignment timestamp. */
+    volatile std::int64_t timestamp{0};
+

Review Comment:
   `volatile` does not provide thread-safety and is generally not appropriate 
for cross-thread synchronization in C++. If this timestamp is accessed 
concurrently, use `std::atomic<std::int64_t>` (or keep it as a plain 
`std::int64_t` and ensure all access is externally synchronized).



##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+    structured_message() = default;
+
+    structured_message(const structured_message& structured_message) = default;
+
+    explicit structured_message(std::vector<std::byte> data)
+        :m_data(std::move(data)) { }
+
+    [[nodiscard]] protocol::reader payload_reader() const {
+        return protocol::reader{{m_data.data() + m_payload_pos, m_data.size() 
- m_payload_pos}};
+    }
+protected:
+    std::vector<std::byte> m_data{};
+    size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+    client_message() = default;
+
+    client_message(const client_message& msg) = default;
+
+    explicit client_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_op = static_cast<protocol::client_operation>(rd.read_int32());
+        m_req_id = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos < m_data.size());
+    }

Review Comment:
   `assert(m_payload_pos < m_data.size())` rejects valid messages with an empty 
payload (where `m_payload_pos == m_data.size()`). This can trigger in real 
protocol messages that have only headers. The assert should allow `==` (and 
ideally also validate that the header fields were fully read before using 
`m_payload_pos`).



##########
modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp:
##########
@@ -262,9 +274,36 @@ std::shared_ptr<node_connection> 
cluster_connection::get_random_connected_channe
     return std::next(m_connections.begin(), idx)->second;
 }
 
+std::shared_ptr<node_connection> 
cluster_connection::get_preferred_channel(const std::string& 
preferred_node_name) {
+    std::unique_lock lock(m_connections_mutex);
+    for (auto& [id, conn] : m_connections) {
+        if (conn->m_node_name == preferred_node_name) {
+            return conn;
+        }
+    }

Review Comment:
   `cluster_connection` accesses `conn->m_node_name`, but `m_node_name` is a 
private member of `node_connection` (declared in node_connection.h). This will 
not compile without a friend declaration. Expose a public accessor on 
`node_connection` (e.g., `get_node_name()` / `node_name()`) or make 
`cluster_connection` a friend if that’s acceptable.



##########
modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp:
##########
@@ -472,4 +482,63 @@ std::shared_ptr<table_impl> table_impl::from_facade(table 
&tb) {
     return tb.m_impl;
 }
 
+void table_impl::update_partition_assignment() {
+    ignite_callback<std::shared_ptr<protocol::partition_assignment>> callback 
= [self=shared_from_this()](auto pa) {
+        self->m_partition_assignment = pa.value();
+    };
+
+    load_partition_assignment_async(std::move(callback));
+}
+
+void 
table_impl::load_partition_assignment_async(ignite_callback<std::shared_ptr<protocol::partition_assignment>>
 callback) {
+    std::int64_t timestamp = m_connection->get_assignment_timestamp();
+
+    {
+        auto pa = get_partition_assignment();
+        if (pa && !pa->is_outdated(timestamp)) {
+            m_connection->get_logger()->log_debug("Partition assignment for 
table " + get_name() + " is up to date.");
+
+            callback(std::move(pa));
+            return;
+        }
+    }
+
+    auto writer_func = [id = m_id, timestamp](protocol::writer &writer, auto&) 
{
+        protocol::write_partition_assignment_request(writer, id, timestamp);
+    };
+
+    auto reader_func = [timestamp](protocol::reader &reader) -> 
std::shared_ptr<protocol::partition_assignment> {
+        return protocol::read_partition_assignment_response(reader, timestamp);
+    };
+
+    
m_connection->perform_request<std::shared_ptr<protocol::partition_assignment>>(
+        protocol::client_operation::PARTITION_ASSIGNMENT_GET,
+        nullptr,
+        writer_func,
+        std::move(reader_func),
+        std::move(callback));
+}
+
+std::optional<std::string> table_impl::get_preferred_node_name(const 
ignite_tuple &key, const schema &sch) {
+    auto pa = get_partition_assignment();
+
+    if (!pa) {
+        return {};
+    }
+
+    hash_calculator hc;
+
+    for (auto column : sch.collocated_columns) {
+        auto val = key.get(column->key_index);
+        hc.append(val, column->scale, column->precision);
+    }
+
+    auto hash = hc.result_hash();
+
+
+    auto part_id = std::abs(hash % 
static_cast<int32_t>(pa->partitions.size()));
+
+    return pa->partitions[part_id];

Review Comment:
   `get_preferred_node_name` can divide by zero when `pa->partitions.size() == 
0` (e.g., when the server returns `cnt == 0`, or if an empty assignment is 
present). This makes `hash % size` undefined and will crash. Add a guard for 
empty `partitions` before computing the modulo and return `std::nullopt` 
(falling back to random channel) when mapping is unavailable.



##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+    structured_message() = default;
+
+    structured_message(const structured_message& structured_message) = default;
+
+    explicit structured_message(std::vector<std::byte> data)
+        :m_data(std::move(data)) { }
+
+    [[nodiscard]] protocol::reader payload_reader() const {
+        return protocol::reader{{m_data.data() + m_payload_pos, m_data.size() 
- m_payload_pos}};
+    }
+protected:
+    std::vector<std::byte> m_data{};
+    size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+    client_message() = default;
+
+    client_message(const client_message& msg) = default;
+
+    explicit client_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_op = static_cast<protocol::client_operation>(rd.read_int32());
+        m_req_id = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos < m_data.size());
+    }
+
+    [[nodiscard]] protocol::client_operation get_op() const { return m_op; }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+private:
+    protocol::client_operation m_op{};
+    int64_t m_req_id{};
+};
+
+/**
+ * Message which server sends to client.
+ */
+class server_message : public structured_message {
+public:
+    server_message() = default;
+
+    server_message(const server_message&) = default;
+
+    explicit server_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_req_id = rd.read_int64();
+        m_flags = rd.read_int32();
+        m_obs_ts = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos < m_data.size());
+    }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+    [[nodiscard]] int32_t get_flags() const { return m_flags; }
+
+    [[nodiscard]] int64_t get_obs_ts() const { return m_obs_ts; }
+
+private:
+    int64_t m_req_id{};
+    int32_t m_flags{};
+    int64_t m_obs_ts{};
+};
+
+/**
+ * Intercepts messages which go through @c asio_proxy.
+ */
+class message_listener {
+public:
+    void register_message(raw_message msg) {
+        std::unique_lock lock(m_mutex);
+        m_queue.push(std::move(msg));
+    }
+
+    [[nodiscard]] std::queue<raw_message> get_msg_queue() const {
+        std::shared_lock lock(m_mutex);
+        return m_queue;
+    }
+
+    template<typename MESSAGE_TYPE>
+    std::vector<MESSAGE_TYPE> get_next() {
+        std::unique_lock lock(m_mutex);
+
+        std::vector<MESSAGE_TYPE> res;
+
+        while (!m_queue.empty() && res.empty()) {
+            auto chunk = m_queue.front();
+            m_queue.pop();
+
+            network::data_buffer_ref buf{{chunk.data(), chunk.size()}};

Review Comment:
   `get_next()` copies the queued buffer (`auto chunk = m_queue.front();`) and 
keeps the mutex locked while decoding/parsing messages. This is avoidable 
overhead and can block producers unnecessarily. Move the chunk out of the queue 
(e.g., `std::move(m_queue.front())`) and consider releasing the lock before 
decoding to minimize contention.



##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+    structured_message() = default;
+
+    structured_message(const structured_message& structured_message) = default;
+
+    explicit structured_message(std::vector<std::byte> data)
+        :m_data(std::move(data)) { }
+
+    [[nodiscard]] protocol::reader payload_reader() const {
+        return protocol::reader{{m_data.data() + m_payload_pos, m_data.size() 
- m_payload_pos}};
+    }
+protected:
+    std::vector<std::byte> m_data{};
+    size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+    client_message() = default;
+
+    client_message(const client_message& msg) = default;
+
+    explicit client_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_op = static_cast<protocol::client_operation>(rd.read_int32());
+        m_req_id = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos < m_data.size());
+    }
+
+    [[nodiscard]] protocol::client_operation get_op() const { return m_op; }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+private:
+    protocol::client_operation m_op{};
+    int64_t m_req_id{};
+};
+
+/**
+ * Message which server sends to client.
+ */
+class server_message : public structured_message {
+public:
+    server_message() = default;
+
+    server_message(const server_message&) = default;
+
+    explicit server_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_req_id = rd.read_int64();
+        m_flags = rd.read_int32();
+        m_obs_ts = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos < m_data.size());
+    }

Review Comment:
   Same issue as in `client_message`: `assert(m_payload_pos < m_data.size())` 
will fail for responses with no payload (payload position equals buffer size). 
This should allow empty payloads (use `<=`) so the proxy can inspect any 
operation safely.



##########
modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp:
##########
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "detail/string_extensions.h"
+#include "ignite_runner_suite.h"
+#include "tests/test-common/test_utils.h"
+
+#include "ignite/client/detail/utils.h"
+#include "ignite/client/ignite_client.h"
+#include "ignite/client/ignite_client_configuration.h"
+
+#include "proxy/asio_proxy.h"
+#include "proxy/message_listener.h"
+
+#include <gtest/gtest.h>
+
+namespace ignite {
+
+using namespace std::chrono_literals;
+
+using part_id_type = std::size_t;
+using key_type = std::int64_t;
+
+constexpr key_type MIN_KEY = 0;
+constexpr key_type MAX_KEY = 1000;
+
+static const std::map<std::string, std::string> NODES_TO_ADDR = {
+    {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner", 
"127.0.0.1:10942"},
+    {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner_2", 
"127.0.0.1:10943"}
+};
+
+/**
+ * Parses string-encoded partition distribution.
+ * Example of string:
+ * 
ff979603-fb56-49e9-bc79-7c4487bbbafd=[4];2e14bb82-5cb5-4283-aafd-f2d8552a842a=[5,
 8, 9];
+ * @param encoded String-encoded distribution
+ * @return Conveniently structured distribution.
+ */
+std::map<int64_t, uuid> parse_partition_distribution(const std::string& 
encoded) {
+    std::map<int64_t, uuid> res;
+
+    size_t lhs = 0;
+    size_t rhs = std::string::npos;
+    while ((rhs = encoded.find(';', lhs)) != std::string::npos) {
+        std::string chunk= encoded.substr(lhs, rhs - lhs);
+
+        char eq_pos = chunk.find('=');
+

Review Comment:
   `chunk.find('=')` returns `size_t`, but the result is stored in `char 
eq_pos`. This truncates the position (and turns `npos` into an arbitrary small 
value), causing incorrect parsing and potentially out-of-range substrings. Use 
`size_t` for `eq_pos` and explicitly handle the `npos` case.
   ```suggestion
           std::string::size_type eq_pos = chunk.find('=');
   
           if (eq_pos == std::string::npos) {
               throw std::runtime_error("can't parse partition distribution, 
incorrect input:" + encoded);
           }
   ```



##########
modules/platforms/cpp/ignite/client/detail/node_connection.h:
##########
@@ -157,6 +157,8 @@ class node_connection : public 
std::enable_shared_from_this<node_connection> {
         bool sent = m_pool->send(m_id, std::move(message));
         if (!sent) {
             get_and_remove_handler(req_id);
+            /*m_logger->log_error(*/auto str= "Connection id=" + 
std::to_string(id()) + " req_id=" + std::to_string(req_id) + " failed";//);
+            std::cout << str << std::endl;

Review Comment:
   This introduces direct `std::cout` printing (and commented-out logger code) 
in a core request path. Besides being noisy in production/tests, it can break 
builds (header doesn’t include `<iostream>`) and bypasses the existing 
logging/error handling conventions. Please remove the `std::cout` usage and 
report the send failure via `m_logger` and/or by propagating an `ignite_error` 
to the handler/callback.
   ```suggestion
               m_logger->log_error(
                   "Connection id=" + std::to_string(id()) + " req_id=" + 
std::to_string(req_id) + " failed");
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java:
##########
@@ -119,6 +120,7 @@
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
 import org.jetbrains.annotations.Nullable;
 

Review Comment:
   Unused imports were added (`java.util.HashMap` and 
`org.apache.ignite.table.partition.PartitionDistribution`) but are not 
referenced anywhere in this file. This will typically fail 
compilation/checkstyle with unused-import rules; please remove them or use them 
as intended.



##########
modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp:
##########
@@ -472,4 +482,63 @@ std::shared_ptr<table_impl> table_impl::from_facade(table 
&tb) {
     return tb.m_impl;
 }
 
+void table_impl::update_partition_assignment() {
+    ignite_callback<std::shared_ptr<protocol::partition_assignment>> callback 
= [self=shared_from_this()](auto pa) {

Review Comment:
   `update_partition_assignment()` callback assumes success (`pa.value()`) and 
writes `m_partition_assignment` without taking `m_partitions_mutex`. This can 
both crash on errors (value() when `has_error()`) and cause a data race with 
`get_partition_assignment()` which *does* lock the mutex. Handle the error case 
(propagate/log) and update `m_partition_assignment` under `m_partitions_mutex`.
   ```suggestion
       ignite_callback<std::shared_ptr<protocol::partition_assignment>> 
callback = [self = shared_from_this()](auto pa) {
           if (pa.has_error()) {
               self->m_connection->get_logger()->log_error(
                   "Failed to update partition assignment for table " + 
self->get_name() + ".");
               return;
           }
   
           std::lock_guard lock(self->m_partitions_mutex);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to