Copilot commented on code in PR #7966:
URL: https://github.com/apache/ignite-3/pull/7966#discussion_r3067154184
##########
modules/platforms/cpp/ignite/common/uuid.h:
##########
@@ -44,7 +47,65 @@ class uuid {
*/
constexpr uuid(std::int64_t most, std::int64_t least) noexcept
: most(most)
- , least(least) {}
+ , least(least) { }
+
+ /**
+ * Parses string-encoded uuid.
+ * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
+ * @param text String-encoded uuid.
+ * @return @c uuid object if parsing was successful, otherwise empty
optional.
+ */
+ static std::optional<uuid> from_string(const std::string& text) {
+ if (text.length() != 36) {
+ return {};
+ }
+
+ auto str = text.c_str();
+
+ auto parse_chunk = [str](size_t beg, size_t end, int64_t& out) -> bool
{
+ char* p;
+
+ if (errno != 0) {
+ errno = 0;
+ }
+
+ out = std::strtoll(str + beg, &p, 16);
+
+ if (p != str + end || (*p != '-' && *p != '\0') || errno ==
ERANGE) {
+ return false;
+ }
+
+ return true;
+ };
+
+ int64_t msb1, msb2, msb3;
+ int64_t lsb1, lsb2;
+
+ if (!parse_chunk(0, 8, msb1)) {
+ return {};
+ }
+
+ if (!parse_chunk(9, 13, msb2)) {
+ return {};
+ }
+
+ if (!parse_chunk(14, 18, msb3)) {
+ return {};
+ }
+
+ if (!parse_chunk(19, 23, lsb1)) {
+ return {};
+ }
+
+ if (!parse_chunk(24, 36, lsb2)) {
+ return {};
+ }
+
+ int64_t msb = msb1 << 32 | msb2 << 16 | msb3;
+ int64_t lsb = lsb1 << 48 | lsb2;
+
+ return uuid{msb, lsb};
Review Comment:
`uuid::from_string` builds the 64-bit halves using left shifts on signed
`int64_t` values (`msb1 << 32`, `lsb1 << 48`). Left-shifting into the sign bit
is undefined behavior and can yield different results across
compilers/optimizations. Use `uint64_t` intermediates for the bit assembly (as
the stream operator already does) and only cast to `int64_t` at the end.
```suggestion
std::uint64_t msb =
(static_cast<std::uint64_t>(msb1) << 32) |
(static_cast<std::uint64_t>(msb2) << 16) | static_cast<std::uint64_t>(msb3);
std::uint64_t lsb = (static_cast<std::uint64_t>(lsb1) << 48) |
static_cast<std::uint64_t>(lsb2);
return uuid{static_cast<std::int64_t>(msb),
static_cast<std::int64_t>(lsb)};
```
##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+ structured_message() = default;
+
+ structured_message(const structured_message& structured_message) = default;
+
+ explicit structured_message(std::vector<std::byte> data)
+ :m_data(std::move(data)) { }
+
+ [[nodiscard]] protocol::reader payload_reader() const {
+ return protocol::reader{{m_data.data() + m_payload_pos, m_data.size()
- m_payload_pos}};
+ }
+protected:
+ std::vector<std::byte> m_data{};
+ size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+ client_message() = default;
+
+ client_message(const client_message& msg) = default;
+
+ explicit client_message(std::vector<std::byte> data)
+ : structured_message(std::move(data))
+ {
+ protocol::reader rd{{m_data.data(), m_data.size()}};
+
+ m_op = static_cast<protocol::client_operation>(rd.read_int32());
+ m_req_id = rd.read_int64();
+
+ m_payload_pos = rd.position();
+
+ assert(m_payload_pos == 0 || m_payload_pos < m_data.size());
Review Comment:
`assert(m_payload_pos == 0 || m_payload_pos < m_data.size())` rejects the
valid case where the payload is empty (`m_payload_pos == m_data.size()`). This
can trip assertions for operations/responses with no payload. Allow `<=
m_data.size()` (and ideally validate that the header read didn't run past the
end).
```suggestion
assert(m_payload_pos <= m_data.size());
assert(m_payload_pos == 0 || m_payload_pos <= m_data.size());
```
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java:
##########
@@ -57,6 +57,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
Review Comment:
This import appears unused in the file. If Checkstyle/IDE inspections are
enforced in CI, unused imports can fail the build or at least create noise.
Remove the `HashMap` import if it's not needed.
```suggestion
```
##########
modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp:
##########
@@ -0,0 +1,623 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "detail/string_extensions.h"
+#include "ignite_runner_suite.h"
+#include "tests/test-common/test_utils.h"
+
+#include "ignite/client/detail/utils.h"
+#include "ignite/client/ignite_client.h"
+#include "ignite/client/ignite_client_configuration.h"
+
+#include "proxy/asio_proxy.h"
+#include "proxy/message_listener.h"
+
+#include <gtest/gtest.h>
+
+namespace ignite {
+
+using namespace std::chrono_literals;
+
+using part_id_type = std::size_t;
+using key_type = std::int64_t;
+
+constexpr key_type MIN_KEY = 0;
+constexpr key_type MAX_KEY = 1000;
+
+static const std::map<std::string, std::string> NODES_TO_ADDR = {
+ {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner",
"127.0.0.1:10942"},
+ {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner_2",
"127.0.0.1:10943"}
+};
+
+/**
+ * Parses string-encoded partition distribution.
+ * Example of string:
+ *
ff979603-fb56-49e9-bc79-7c4487bbbafd=[4];2e14bb82-5cb5-4283-aafd-f2d8552a842a=[5,
8, 9];
+ * @param encoded String-encoded distribution
+ * @return Conveniently structured distribution.
+ */
+std::map<int64_t, uuid> parse_partition_distribution(const std::string&
encoded) {
+ std::map<int64_t, uuid> res;
+
+ size_t lhs = 0;
+ size_t rhs = std::string::npos;
+ while ((rhs = encoded.find(';', lhs)) != std::string::npos) {
+ std::string chunk= encoded.substr(lhs, rhs - lhs);
+
+ auto eq_pos = chunk.find('=');
+
+ if (eq_pos == std::string::npos) {
+ throw std::runtime_error("Incorrect partition distribution text:"
+ encoded);
+ }
+
+ auto node_id = uuid::from_string(chunk.substr(0, eq_pos));
+
+ if (!node_id.has_value()) {
+ throw std::runtime_error("can't parse partition distribution,
incorrect input:" + encoded);
+ }
+
+ std::string num_list = chunk.substr(eq_pos + 1); // e.g. [1,2,3]
+
+ size_t lo = 1;
+ size_t hi = std::string::npos;
+ while ((hi = num_list.find_first_of(",]", lo)) != std::string::npos) {
+ int64_t part_id = std::stoll(num_list.substr(lo, hi - lo));
+ res[part_id] = *node_id;
+ lo = hi + 1;
+ }
+ lhs = rhs + 1;
+ }
+
+ return res;
+}
+
+class partition_awareness_test : public ignite_runner_suite {
+protected:
+ void SetUp() override {
+ ignite_client_configuration client_cfg;
+ client_cfg.set_endpoints(get_node_addrs());
+
+ m_direct_client = ignite_client::start(client_cfg, 5s);
+
+ auto nodes = m_direct_client.get_cluster_nodes();
+
+
+ for (auto& node: nodes) {
+ if (NODES_TO_ADDR.count(node.get_name())) {
+ m_endpoint_to_node_id[NODES_TO_ADDR.at(node.get_name())] =
node.get_id();
+ }
+ }
+
+ ASSERT_EQ(m_endpoint_to_node_id.size(), get_node_addrs().size()) <<
"Incorrect node names or address!"
+ << "This test should be able to connect to all non-ssl nodes";
+
+ collect_partition_distribution();
+
+ setup_proxy();
+ }
+
+ static ignite_tuple key_tup(key_type key) {
+ return get_tuple(key);
+ }
+
+ static ignite_tuple prim_val(key_type key) {
+ return get_tuple(std::to_string(key));
+ }
+
+ static ignite_tuple alt_val(key_type key) {
+ return get_tuple(std::to_string(key + MAX_KEY));
+ }
+
+ static ignite_tuple prim_rec(key_type key) {
+ return get_tuple(key, std::to_string(key));
+ }
+
+ static ignite_tuple alt_rec(key_type key) {
+ return get_tuple(key, std::to_string(key + MAX_KEY));
+ }
+
+ void collect_partition_distribution() {
+ std::map<key_type, int64_t> key_to_part;
+
+ populate_table();
+
+ std::stringstream sql;
+ sql << "select " << KEY_COLUMN << ", " << PART_PSEUDOCOLUMN << " from
" << TABLE_1 << " order by " << KEY_COLUMN;
+
+ auto rs = m_direct_client.get_sql().execute(nullptr, nullptr,
{sql.str()}, {});
+ do {
+ auto page = rs.current_page();
+
+ for (auto &rec : page) {
+ auto key = rec.get(KEY_COLUMN).get<key_type>();
+ auto part_id = rec.get(PART_PSEUDOCOLUMN).get<int64_t>();
+
+ key_to_part[key] = part_id;
+ }
+
+ if (rs.has_more_pages())
+ rs.fetch_next_page();
+ else
+ break;
+ } while (true);
+
+
+ auto job_exec = m_direct_client.get_compute().submit(
+ job_target::any_node(m_direct_client.get_cluster_nodes()),
+ job_descriptor::builder{GET_PART_DISTRIBUTION_JOB}.build(),
+ {TABLE_1}
+ );
+
+ auto res = job_exec.get_result()->get_primitive().get<std::string>();
+
+ auto part_to_node = parse_partition_distribution(res);
+
+ for (auto [key, part_id] : key_to_part) {
+ m_key_distribution[key] = part_to_node[part_id];
+
+ auto node_id = part_to_node[part_id];
+ std::string endpoint;
+ for (auto& [ep, id] : m_endpoint_to_node_id) {
+ if (node_id == id) {
+ endpoint = ep;
+ }
+ }
Review Comment:
`endpoint` is assigned but never used. With `-Werror` enabled (see top-level
CMake `WARNINGS_AS_ERRORS`), this can fail the build. Remove the variable or
use it for an assertion (e.g., verify every `node_id` maps to a known endpoint).
```suggestion
bool endpoint_found = false;
for (auto& [ep, id] : m_endpoint_to_node_id) {
if (node_id == id) {
endpoint_found = true;
break;
}
}
ASSERT_TRUE(endpoint_found);
```
##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
Review Comment:
This header relies on transitive includes for `assert` and
`std::atomic_bool` (no `<cassert>` / `<atomic>` included here). That makes the
header non-self-contained and can break builds if include order changes or it
is included directly elsewhere. Add the missing standard headers explicitly.
```suggestion
#include <atomic>
#include <cassert>
```
##########
modules/platforms/cpp/ignite/protocol/messages.cpp:
##########
@@ -100,4 +100,39 @@ handshake_response parse_handshake_response(bytes_view
message) {
return res;
}
+void write_partition_assignment_request(writer &writer, std::int32_t table_id,
std::int64_t timestamp) {
+ writer.write(table_id);
+ writer.write(timestamp);
+}
+
+std::shared_ptr<partition_assignment>
read_partition_assignment_response(reader &reader, std::int64_t timestamp) {
+ auto cnt = reader.read_int32();
+ if (cnt < 0)
+ throw ignite_error("Invalid partition count: " + std::to_string(cnt));
+
+ std::vector<std::optional<std::string>> partitions;
+ partitions.reserve(cnt);
Review Comment:
`read_partition_assignment_response` accepts `cnt == 0` as valid and
proceeds, but server/client implementations treat non-positive partition counts
as invalid. Consider validating `cnt <= 0` and failing fast with a clear error
to avoid silently creating an empty assignment and disabling partition
awareness.
##########
modules/platforms/cpp/ignite/client/detail/cluster_connection.h:
##########
@@ -395,6 +448,9 @@ class cluster_connection : public
std::enable_shared_from_this<cluster_connectio
/** Node connections. */
std::unordered_map<uint64_t, std::shared_ptr<node_connection>>
m_connections;
+ /** Node consistent ID to node instance mapping. */
+ std::unordered_map<std::string, node_connection*> m_consistent_id_mapping;
+
Review Comment:
`m_consistent_id_mapping` is added but not used anywhere (no reads/writes in
the codebase). If this was intended to optimize preferred-node lookups, please
either wire it up (update on connect/disconnect and use it in
`get_preferred_channel`) or remove it to avoid dead state and confusion.
```suggestion
```
##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
Review Comment:
The Apache license header formatting is inconsistent (`* Licensed...` line
is missing the usual leading space). Some repos have automated checks that
require the exact header template, so this can break CI. Align this header with
the standard `/*\n * Licensed ...` format used elsewhere in the C++ sources.
##########
modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp:
##########
@@ -472,4 +584,68 @@ std::shared_ptr<table_impl> table_impl::from_facade(table
&tb) {
return tb.m_impl;
}
+void table_impl::update_partition_assignment() {
+ ignite_callback<std::shared_ptr<protocol::partition_assignment>> callback
= [self=shared_from_this()](auto pa) {
+ if (pa.has_error()) {
+ self->m_connection->get_logger()->log_error("Error while updating
partition assignment for table"
+ + self->get_name() + "error " + pa.error().what_str());
+
+ return;
+ }
+
+ std::lock_guard lock(self->m_partitions_mutex);
+ self->m_partition_assignment = std::move(pa).value();
+ };
+
+ load_partition_assignment_async(std::move(callback));
+}
Review Comment:
`update_partition_assignment()` triggers `load_partition_assignment_async()`
on every operation and there is no de-duplication of in-flight assignment
requests. Under concurrent load (or when the assignment is outdated), multiple
threads can send many `PARTITION_ASSIGNMENT_GET` requests for the same
table/timestamp. Consider caching the in-flight future/result per
table+timestamp (similar to Java `ClientTable#getPartitionAssignment`) or
adding a simple in-progress flag guarded by `m_partitions_mutex`.
##########
modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp:
##########
@@ -472,4 +584,68 @@ std::shared_ptr<table_impl> table_impl::from_facade(table
&tb) {
return tb.m_impl;
}
+void table_impl::update_partition_assignment() {
+ ignite_callback<std::shared_ptr<protocol::partition_assignment>> callback
= [self=shared_from_this()](auto pa) {
+ if (pa.has_error()) {
+ self->m_connection->get_logger()->log_error("Error while updating
partition assignment for table"
+ + self->get_name() + "error " + pa.error().what_str());
+
+ return;
+ }
+
+ std::lock_guard lock(self->m_partitions_mutex);
+ self->m_partition_assignment = std::move(pa).value();
+ };
+
+ load_partition_assignment_async(std::move(callback));
+}
+
+void
table_impl::load_partition_assignment_async(ignite_callback<std::shared_ptr<protocol::partition_assignment>>
callback) {
+ std::int64_t timestamp = m_connection->get_assignment_timestamp();
+
+ auto pa = get_partition_assignment();
+ if (pa && !pa->is_outdated(timestamp)) {
+ m_connection->get_logger()->log_debug("Partition assignment for table
" + get_name() + " is up to date.");
+
+ callback(std::move(pa));
+ return;
+ }
+
+ auto writer_func = [id = m_id, timestamp](protocol::writer &writer, auto&)
{
+ protocol::write_partition_assignment_request(writer, id, timestamp);
+ };
+
+ auto reader_func = [timestamp](protocol::reader &reader) ->
std::shared_ptr<protocol::partition_assignment> {
+ return protocol::read_partition_assignment_response(reader, timestamp);
+ };
+
+
m_connection->perform_request<std::shared_ptr<protocol::partition_assignment>>(
+ protocol::client_operation::PARTITION_ASSIGNMENT_GET,
+ nullptr,
+ writer_func,
+ std::move(reader_func),
+ std::move(callback));
+}
+
+std::optional<std::string> table_impl::get_preferred_node_name(const
ignite_tuple &key_or_rec, const schema &sch) {
+ auto pa = get_partition_assignment();
+
+ if (!pa || pa->get_partitions().empty()) {
+ m_connection->get_logger()->log_debug("No partition distribution
available, random node will called");
+ return {};
+ }
+
+ hash_calculator hc;
+ for (auto column : sch.collocated_columns) {
+ auto val = key_or_rec.get(column->key_index);
Review Comment:
`get_preferred_node_name` reads tuple values by `column->key_index` (tuple
ordinal), but tuples are serialized by column *name* (see `pack_tuple`:
`tuple.column_ordinal(col.name)`). For record tuples (and even key tuples built
with a different insertion order), `key_index` is not a stable ordinal, so the
hash (and partition selection) can be wrong. Use column names (or schema column
indices) to fetch values from `ignite_tuple` when computing the colocated hash.
```suggestion
auto val = key_or_rec.get(column->name);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]