isapego commented on code in PR #7966:
URL: https://github.com/apache/ignite-3/pull/7966#discussion_r3077787797


##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <atomic>
+#include <cassert>
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+    structured_message() = default;
+
+    structured_message(const structured_message& structured_message) = default;
+
+    explicit structured_message(std::vector<std::byte> data)
+        :m_data(std::move(data)) { }
+
+    [[nodiscard]] protocol::reader payload_reader() const {
+        return protocol::reader{{m_data.data() + m_payload_pos, m_data.size() 
- m_payload_pos}};
+    }
+protected:
+    std::vector<std::byte> m_data{};
+    size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+    client_message() = default;
+
+    client_message(const client_message& msg) = default;
+
+    explicit client_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_op = static_cast<protocol::client_operation>(rd.read_int32());
+        m_req_id = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos <= m_data.size());
+    }
+
+    [[nodiscard]] protocol::client_operation get_op() const { return m_op; }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+private:
+    protocol::client_operation m_op{};
+    int64_t m_req_id{};
+};
+
+/**
+ * Message which server sends to client.
+ */
+class server_message : public structured_message {
+public:
+    server_message() = default;
+
+    server_message(const server_message&) = default;
+
+    explicit server_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_req_id = rd.read_int64();
+        m_flags = rd.read_int32();
+        m_obs_ts = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos <= m_data.size());
+    }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+    [[nodiscard]] int32_t get_flags() const { return m_flags; }
+
+    [[nodiscard]] int64_t get_obs_ts() const { return m_obs_ts; }
+
+private:
+    int64_t m_req_id{};
+    int32_t m_flags{};
+    int64_t m_obs_ts{};
+};
+
+/**
+ * Intercepts messages which go through @c asio_proxy.
+ */
+class message_listener {
+public:
+    void register_message(raw_message msg) {
+        if (enable_message_registration.load()) {
+            std::unique_lock lock(m_mutex);
+            m_queue.push(std::move(msg));
+        }
+    }
+
+    [[nodiscard]] std::queue<raw_message> get_msg_queue() const {
+        std::shared_lock lock(m_mutex);
+        return m_queue;
+    }
+
+    template<typename MESSAGE_TYPE>
+    std::vector<MESSAGE_TYPE> get_next() {
+        std::unique_lock lock(m_mutex);
+
+        std::vector<MESSAGE_TYPE> res;
+
+        while (!m_queue.empty() && res.empty()) {
+            auto& chunk = m_queue.front();
+
+            network::data_buffer_ref buf{{chunk.data(), chunk.size()}};
+
+            while (true) {
+                auto out  = codec.decode(buf);
+
+                if (out.empty()) {
+                    break;
+                }
+
+                auto out_bv = out.get_bytes_view();
+                std::vector<std::byte> data{out_bv.begin(), out_bv.end()};
+                res.emplace_back(std::move(data));
+            }
+            m_queue.pop();
+        }
+
+        return res;
+    }
+
+    /**
+     * Enable/disables message registration for message listeners;
+     * @param enable @c True if registration enabled, otherwise disabled.

Review Comment:
   ```suggestion
        * Enable/disable message registration for message listeners;
        * @param enable @c True if registration enabled, otherwise disabled.
   ```



##########
modules/platforms/cpp/ignite/client/detail/table/schema.h:
##########
@@ -118,30 +125,52 @@ struct schema {
      * @return A new schema instance.
      */
     static std::shared_ptr<schema> create_instance(std::int32_t version, 
std::vector<column> &&cols) {
-        std::int32_t key_columns_cnt = 0;
+        size_t key_columns_cnt = 0;
+        size_t collocated_columns_cnt = 0;

Review Comment:
   Minor: Here and in other places:
   ```suggestion
           std::size_t key_columns_cnt = 0;
           std::size_t collocated_columns_cnt = 0;
   ```



##########
modules/platforms/cpp/ignite/protocol/messages.h:
##########
@@ -66,10 +69,16 @@ struct handshake_response {
     protocol_context context{};
 
     /** Observable timestamp. */
-    std::int64_t observable_timestamp;
+    std::int64_t observable_timestamp{};
 
     /** Server idle timeout in ms. */
-    std::int64_t idle_timeout_ms;
+    std::int64_t idle_timeout_ms{};

Review Comment:
   Let's init it to explicit value if you do anyway. Reads better, requires 
less implicit knowledge.
   ```suggestion
       std::int64_t observable_timestamp{0};
   
       /** Server idle timeout in ms. */
       std::int64_t idle_timeout_ms{0};
   ```



##########
modules/platforms/cpp/ignite/protocol/messages.cpp:
##########
@@ -100,4 +100,39 @@ handshake_response parse_handshake_response(bytes_view 
message) {
     return res;
 }
 
+void write_partition_assignment_request(writer &writer, std::int32_t table_id, 
std::int64_t timestamp) {
+    writer.write(table_id);
+    writer.write(timestamp);
+}
+
+std::shared_ptr<partition_assignment> 
read_partition_assignment_response(reader &reader, std::int64_t timestamp) {
+    auto cnt = reader.read_int32();
+    if (cnt <= 0)
+        throw ignite_error("Invalid partition count: " + std::to_string(cnt));

Review Comment:
   Are we sure partition count can never be zero in response? What about the 
case when the next field `assignment_available` is `false`?



##########
modules/platforms/cpp/ignite/common/uuid_test.cpp:
##########
@@ -64,3 +64,13 @@ TEST(uuid, stream) {
 
     EXPECT_EQ(uuidString, uuidString2);
 }
+
+TEST(uuid, from_string) {
+    std::string uuidText = "4b62e46a-d380-460f-94ea-9b4320752634";
+
+    auto opt = ignite::uuid::from_string(uuidText);
+
+    EXPECT_TRUE(opt.has_value());
+    EXPECT_EQ(5432155248028304911LL, opt->get_most_significant_bits());
+    EXPECT_EQ(-7716184298936261068LL, opt->get_least_significant_bits());
+}

Review Comment:
   Let's add more tests. This is a public interface change, this function will 
be exposed to a user, we have to test it properly. We should definitely check 
that conversation from string and back to string produces the same result at 
least.



##########
modules/platforms/cpp/ignite/common/uuid.h:
##########
@@ -46,6 +50,64 @@ class uuid {
         : most(most)
         , least(least) {}
 
+    /**
+     * Parses string-encoded uuid.
+     * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
+     * @param text String-encoded uuid.
+     * @return @c uuid object if parsing was successful, otherwise empty 
optional.
+     */
+    static std::optional<uuid> from_string(const std::string& text) {
+        if (text.length() != 36) {
+            return {};
+        }
+
+        auto str = text.c_str();
+
+        auto parse_chunk = [str](size_t beg, size_t end, uint64_t& out) -> 
bool {
+            char* p;
+
+            if (errno != 0) {
+                errno = 0;
+            }
+
+            out = std::strtoull(str + beg, &p, 16);
+
+            if (p != str + end || (*p != '-' && *p != '\0') || errno == 
ERANGE) {
+                return false;
+            }
+
+            return true;
+        };
+
+        uint64_t msb1, msb2, msb3;
+        uint64_t lsb1, lsb2;

Review Comment:
   Minor: here and in other places:
   ```suggestion
           std::uint64_t msb1, msb2, msb3;
           std::uint64_t lsb1, lsb2;
   ```



##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+class partition_assignment {
+public:
+    partition_assignment() = default;
+    partition_assignment(const partition_assignment&) = default;
+    partition_assignment(partition_assignment&&) noexcept = default;
+    partition_assignment &operator=(const partition_assignment &other) = 
default;
+    partition_assignment &operator=(partition_assignment &&other) noexcept = 
default;
+
+    partition_assignment(std::int64_t timestamp, 
std::vector<std::optional<std::string>> partitions)
+        : timestamp(timestamp)
+        , partitions(std::move(partitions)) {}
+
+    /**
+     * Check whether the assignment is outdated.
+     *
+     * @param actual_timestamp Timestamp.
+     * @return @c true if assignment is outdated.
+     */
+    [[nodiscard]] bool is_outdated(std::int64_t actual_timestamp) const { 
return timestamp < actual_timestamp; }
+
+    [[nodiscard]] const std::vector<std::optional<std::string>>& 
get_partitions() const {
+        return partitions;
+    }
+private:
+    /** Assignment timestamp. */
+    std::int64_t timestamp{0};
+
+    /** Partitions. */
+    std::vector<std::optional<std::string>> partitions;

Review Comment:
   It's a class, so fields should start with `m_` prefix. Also, it's not clear 
what `partitions` here - is it a mapping [index (=partition) -> node name]? 
Let's specify it explicitly.
   ```suggestion
       /** Assignment timestamp. */
       std::int64_t m_timestamp{0};
   
       /** Partitions. */
       std::vector<std::optional<std::string>> m_partitions;
   ```



##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+class partition_assignment {
+public:
+    partition_assignment() = default;
+    partition_assignment(const partition_assignment&) = default;
+    partition_assignment(partition_assignment&&) noexcept = default;
+    partition_assignment &operator=(const partition_assignment &other) = 
default;
+    partition_assignment &operator=(partition_assignment &&other) noexcept = 
default;
+
+    partition_assignment(std::int64_t timestamp, 
std::vector<std::optional<std::string>> partitions)
+        : timestamp(timestamp)
+        , partitions(std::move(partitions)) {}
+
+    /**
+     * Check whether the assignment is outdated.
+     *
+     * @param actual_timestamp Timestamp.
+     * @return @c true if assignment is outdated.
+     */
+    [[nodiscard]] bool is_outdated(std::int64_t actual_timestamp) const { 
return timestamp < actual_timestamp; }
+
+    [[nodiscard]] const std::vector<std::optional<std::string>>& 
get_partitions() const {

Review Comment:
   Documentation is missing.



##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+class partition_assignment {
+public:
+    partition_assignment() = default;
+    partition_assignment(const partition_assignment&) = default;
+    partition_assignment(partition_assignment&&) noexcept = default;
+    partition_assignment &operator=(const partition_assignment &other) = 
default;
+    partition_assignment &operator=(partition_assignment &&other) noexcept = 
default;
+
+    partition_assignment(std::int64_t timestamp, 
std::vector<std::optional<std::string>> partitions)

Review Comment:
   Documentation is missing.



##########
modules/platforms/cpp/ignite/protocol/partition_assignment.h:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+class partition_assignment {
+public:
+    partition_assignment() = default;
+    partition_assignment(const partition_assignment&) = default;
+    partition_assignment(partition_assignment&&) noexcept = default;
+    partition_assignment &operator=(const partition_assignment &other) = 
default;
+    partition_assignment &operator=(partition_assignment &&other) noexcept = 
default;

Review Comment:
   Only default constructor is needed (see [core 
guidelines](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#rc-zero))
   ```suggestion
       partition_assignment() = default;
   ```



##########
modules/platforms/cpp/ignite/client/detail/cluster_connection.h:
##########
@@ -289,6 +319,22 @@ class cluster_connection : public 
std::enable_shared_from_this<cluster_connectio
      */
     std::shared_ptr<node_connection> get_random_connected_channel();
 
+    /**
+     * Get a preferred node connection if partition mapping is provided
+     * otherwise returns random node connection.
+     * @param preferred_node_name Name of preferred node.
+     * @return Node connection.

Review Comment:
   Here and in other places: spacing, and don't wrap too short (Allowed line 
length is 120)
   ```suggestion
       /**
        * Get a preferred node connection if partition mapping is provided 
otherwise returns random node connection.
        *
        * @param preferred_node_name Name of preferred node.
        * @return Node connection.
   ```



##########
modules/platforms/cpp/tests/test-common/proxy/message_listener.h:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//

Review Comment:
   Here and in other new files: our standard header is different



##########
modules/platforms/cpp/ignite/common/uuid.h:
##########
@@ -46,6 +50,64 @@ class uuid {
         : most(most)
         , least(least) {}
 
+    /**
+     * Parses string-encoded uuid.
+     * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
+     * @param text String-encoded uuid.
+     * @return @c uuid object if parsing was successful, otherwise empty 
optional.
+     */
+    static std::optional<uuid> from_string(const std::string& text) {

Review Comment:
   Function definition should go to cpp - it's a public header, we're hiding 
the implementation.



##########
modules/platforms/cpp/ignite/client/detail/cluster_connection.h:
##########
@@ -289,6 +319,22 @@ class cluster_connection : public 
std::enable_shared_from_this<cluster_connectio
      */
     std::shared_ptr<node_connection> get_random_connected_channel();
 
+    /**
+     * Get a preferred node connection if partition mapping is provided
+     * otherwise returns random node connection.
+     * @param preferred_node_name Name of preferred node.
+     * @return Node connection.
+     */
+    std::shared_ptr<node_connection> get_preferred_channel(const std::string& 
preferred_node_name);
+
+    /**
+     * Get connection according to provided partition mapping
+     * otherwise returns random node connection.
+     * @param pm Partition mapping.
+     * @return Node connection.
+     */
+    std::shared_ptr<node_connection> get_connected_channel(const 
std::optional<std::string> &preferred_node_name);

Review Comment:
   This is a bit confusing. Let's just leave `get_preferred_channel`, which 
acts as `get_connected_channel`. You don't use `get_preferred_channel` on 
itself now anywhere anyway.



##########
modules/platforms/cpp/ignite/protocol/messages.cpp:
##########
@@ -100,4 +100,39 @@ handshake_response parse_handshake_response(bytes_view 
message) {
     return res;
 }
 
+void write_partition_assignment_request(writer &writer, std::int32_t table_id, 
std::int64_t timestamp) {
+    writer.write(table_id);
+    writer.write(timestamp);
+}
+
+std::shared_ptr<partition_assignment> 
read_partition_assignment_response(reader &reader, std::int64_t timestamp) {
+    auto cnt = reader.read_int32();
+    if (cnt <= 0)
+        throw ignite_error("Invalid partition count: " + std::to_string(cnt));
+
+    std::vector<std::optional<std::string>> partitions;
+    partitions.reserve(cnt);
+
+    bool assignment_available = reader.read_bool();
+    if (!assignment_available) {
+        // Invalidate the current assignment so that we can retry on the next 
call.
+        // Return an empty array so that per-partition batches can be 
initialized.
+        // We'll get the actual assignment on the next call.
+        partitions.insert(partitions.end(), cnt, std::nullopt);
+        return std::make_shared<partition_assignment>(0, 
std::move(partitions));
+    }
+
+    // Returned timestamp can be newer than requested.
+    std::int64_t ts = reader.read_int64();
+    if (ts < timestamp)
+        throw ignite_error("Returned timestamp is older than requested: " + 
std::to_string(ts) + " < "
+            + std::to_string(timestamp));

Review Comment:
   Should it really be an exception though? It seems like a normal situation 
that can happen, not some kind of error.



##########
modules/platforms/cpp/ignite/client/detail/node_connection.h:
##########
@@ -232,7 +232,14 @@ class node_connection : public 
std::enable_shared_from_this<node_connection> {
      */
     std::shared_ptr<ignite_logger> get_logger() const { return m_logger; }
 
-    void handle_timeouts();
+    /**
+     * Cancels waiting for over-due responses.
+     */
+     void handle_timeouts();
+
+    const std::string& get_node_name() const {

Review Comment:
   The documentation missing.



##########
modules/platforms/cpp/ignite/common/uuid.h:
##########
@@ -46,6 +50,64 @@ class uuid {
         : most(most)
         , least(least) {}
 
+    /**
+     * Parses string-encoded uuid.
+     * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.

Review Comment:
   ```suggestion
        * Parse string-encoded uuid.
        *
        * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to