This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bd26df60e3e IGNITE-22141 C++ Client: Implement Partition Awareness 
(#7966)
bd26df60e3e is described below

commit bd26df60e3e5278d7baba573aad8e5dcf44c0488
Author: Ed Rakhmankulov <[email protected]>
AuthorDate: Tue Apr 14 19:12:23 2026 +0300

    IGNITE-22141 C++ Client: Implement Partition Awareness (#7966)
---
 .../platform_tests/PlatformCppOdbcTestsDebLinux.kt |   1 -
 .../platform_tests/PlatformCppOdbcTestsRpmLinux.kt |   1 -
 .../platform_tests/PlatformCppOdbcTestsTgzLinux.kt |   1 -
 .../test/platform_tests/PlatformCppTestsLinux.kt   |   1 -
 .../test/platform_tests/PlatformCppTestsWindows.kt |   1 -
 .../ignite/client/detail/cluster_connection.cpp    |  50 +-
 .../cpp/ignite/client/detail/cluster_connection.h  |  68 +-
 .../client/detail/connection_event_handler.h       |   7 +
 .../cpp/ignite/client/detail/node_connection.cpp   |   7 +-
 .../cpp/ignite/client/detail/node_connection.h     |  18 +
 .../cpp/ignite/client/detail/table/schema.h        |  51 +-
 .../cpp/ignite/client/detail/table/table_impl.cpp  | 224 ++++++-
 .../cpp/ignite/client/detail/table/table_impl.h    |  35 +
 modules/platforms/cpp/ignite/common/uuid.cpp       |  52 ++
 modules/platforms/cpp/ignite/common/uuid.h         |  13 +
 modules/platforms/cpp/ignite/common/uuid_test.cpp  |  78 +++
 .../cpp/ignite/network/length_prefix_codec.h       |   3 +-
 .../cpp/ignite/protocol/client_operation.h         |   3 +
 modules/platforms/cpp/ignite/protocol/messages.cpp |  39 +-
 modules/platforms/cpp/ignite/protocol/messages.h   |  31 +-
 .../cpp/ignite/protocol/partition_assignment.h     |  74 +++
 .../platforms/cpp/tests/client-test/CMakeLists.txt |   1 +
 .../cpp/tests/client-test/ignite_runner_suite.h    |  22 +
 .../tests/client-test/partition_awareness_test.cpp | 737 +++++++++++++++++++++
 .../platforms/cpp/tests/fake_server/CMakeLists.txt |   2 +-
 .../cpp/tests/fake_server/connection_test.cpp      |   2 +-
 .../cpp/tests/fake_server/proxy/message_listener.h |  38 --
 .../platforms/cpp/tests/test-common/CMakeLists.txt |   2 +-
 .../proxy/asio_proxy.h                             |  15 +-
 .../cpp/tests/test-common/proxy/message_listener.h | 179 +++++
 .../apache/ignite/internal/runner/app/Jobs.java    |  29 +
 31 files changed, 1677 insertions(+), 108 deletions(-)

diff --git a/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt 
b/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
index e9435c29f32..0a0e1797158 100644
--- a/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsDebLinux : BuildType({
     """.trimIndent()
 
     params {
-        param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
         param("PATH__CMAKE_BUILD_DIRECTORY", 
"%PATH__WORKING_DIR%/cmake-build-debug")
         param("PATH__ODBC_TEST_RESULTS", 
"%PATH__WORKING_DIR%/odbc_tests_results.xml")
         text("PATH__WORKING_DIR", 
"%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp", 
display = ParameterDisplay.HIDDEN, allowEmpty = true)
diff --git a/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt 
b/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
index 40d5a89ea91..6e475d3e9a0 100644
--- a/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsRpmLinux : BuildType({
     """.trimIndent()
 
     params {
-        param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
         param("PATH__CMAKE_BUILD_DIRECTORY", 
"%PATH__WORKING_DIR%/cmake-build-debug")
         param("PATH__ODBC_TEST_RESULTS", 
"%PATH__WORKING_DIR%/odbc_tests_results.xml")
         text("PATH__WORKING_DIR", 
"%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp", 
display = ParameterDisplay.HIDDEN, allowEmpty = true)
diff --git a/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt 
b/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
index dae6fa5ef03..5bc42a296ae 100644
--- a/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsTgzLinux : BuildType({
     """.trimIndent()
 
     params {
-        param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
         param("PATH__CMAKE_BUILD_DIRECTORY", 
"%PATH__WORKING_DIR%/cmake-build-debug")
         param("PATH__ODBC_TEST_RESULTS", 
"%PATH__WORKING_DIR%/odbc_tests_results.xml")
         text("PATH__WORKING_DIR", 
"%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp", 
display = ParameterDisplay.HIDDEN, allowEmpty = true)
diff --git a/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt 
b/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
index 1446d9463c8..1861433b81b 100644
--- a/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
@@ -24,7 +24,6 @@ object PlatformCppTestsLinux : BuildType({
     """.trimIndent()
 
     params {
-        param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
         param("PATH__CMAKE_BUILD_DIRECTORY", 
"%PATH__WORKING_DIR%/cmake-build-debug")
         param("PATH__CLIENT_TEST_RESULTS", 
"%PATH__WORKING_DIR%/cpp_client_tests_results.xml")
         param("PATH__UNIT_TESTS_RESULT", 
"%PATH__WORKING_DIR%/cpp_unit_test_results.xml")
diff --git a/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt 
b/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
index ad89eeb12fb..0614a9b7984 100644
--- a/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
+++ b/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
@@ -27,7 +27,6 @@ object PlatformCppTestsWindows : BuildType({
     """.trimIndent()
 
     params {
-        hiddenText("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
         hiddenText("PATH__CMAKE_BUILD_DIRECTORY", 
"""%PATH__WORKING_DIR%\cmake-build-debug""")
         hiddenText("PATH__CLIENT_TEST_RESULTS", 
"""%PATH__CMAKE_BUILD_DIRECTORY%\cpp_client_tests_results.xml""")
         hiddenText("PATH__ODBC_TEST_RESULTS", 
"""%PATH__CMAKE_BUILD_DIRECTORY%\odbc_tests_results.xml""")
diff --git a/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp 
b/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
index 075403afdcc..d866cd54e49 100644
--- a/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
@@ -92,7 +92,9 @@ void cluster_connection::stop() {
 
 void cluster_connection::on_connection_success(const end_point &addr, uint64_t 
id) {
     m_logger->log_info("Established connection with remote host " + 
addr.to_string());
-    m_logger->log_debug("Connection ID: " + std::to_string(id));
+
+    if (m_logger->is_debug_enabled())
+        m_logger->log_debug("Connection ID: " + std::to_string(id));
 
     auto connection = node_connection::make_new(
         id, m_pool, weak_from_this(), m_logger, m_configuration, 
m_timer_thread);
@@ -214,6 +216,16 @@ void 
cluster_connection::on_observable_timestamp_changed(std::int64_t timestamp)
     }
 }
 
+void cluster_connection::on_partition_assignment_changed(std::int64_t 
timestamp) {
+    auto expected = m_assignment_timestamp.load();
+    while (expected < timestamp) {
+        auto success = m_assignment_timestamp.compare_exchange_weak(expected, 
timestamp);
+        if (success)
+            return;
+        expected = m_assignment_timestamp.load();
+    }
+}
+
 void cluster_connection::remove_client(uint64_t id) {
     [[maybe_unused]] std::unique_lock<std::recursive_mutex> 
lock(m_connections_mutex);
 
@@ -262,9 +274,28 @@ std::shared_ptr<node_connection> 
cluster_connection::get_random_connected_channe
     return std::next(m_connections.begin(), idx)->second;
 }
 
+std::shared_ptr<node_connection> cluster_connection::get_channel(
+    const std::optional<std::string>& preferred_node_name) {
+
+    if (preferred_node_name) {
+        std::unique_lock lock(m_connections_mutex);
+        for (auto& [id, conn] : m_connections) {
+            if (conn->get_node_name() == *preferred_node_name) {
+                return conn;
+            }
+        }
+    }
+
+    return get_random_connected_channel();
+}
+
 std::pair<std::shared_ptr<node_connection>, std::int64_t> 
cluster_connection::perform_request_handler(
-    const operation_function_type &op_func, transaction_impl *tx, const 
writer_function_type &wr,
-    const std::shared_ptr<response_handler> &handler) {
+    const operation_function_type &op_func,
+    transaction_impl *tx,
+    const writer_function_type &wr,
+    const std::shared_ptr<response_handler> &handler,
+    const std::optional<std::string>& preferred_node_name) {
+
     if (tx) {
         auto channel = tx->get_connection();
         if (!channel)
@@ -279,7 +310,7 @@ std::pair<std::shared_ptr<node_connection>, std::int64_t> 
cluster_connection::pe
     }
 
     while (true) {
-        auto channel = get_random_connected_channel();
+        auto channel = get_channel(preferred_node_name);
         if (!channel)
             throw ignite_error(error::code::CONNECTION, "No nodes connected");
 
@@ -290,10 +321,15 @@ std::pair<std::shared_ptr<node_connection>, std::int64_t> 
cluster_connection::pe
     }
 }
 
-void cluster_connection::perform_request_raw(protocol::client_operation op, 
transaction_impl *tx,
-    const writer_function_type &wr, ignite_callback<bytes_view> callback) {
+void cluster_connection::perform_request_raw(
+    protocol::client_operation op,
+    transaction_impl *tx,
+    const writer_function_type &wr,
+    ignite_callback<bytes_view> callback,
+    const std::optional<std::string>& preferred_node_name) {
     auto handler = std::make_shared<response_handler_raw>(std::move(callback));
-    perform_request_handler(static_op(op), tx, wr, std::move(handler));
+
+    perform_request_handler(static_op(op), tx, wr, std::move(handler), 
preferred_node_name);
 }
 
 } // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/cluster_connection.h 
b/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
index d75912d1eae..972b0b8ac76 100644
--- a/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
@@ -106,10 +106,15 @@ public:
      * @param tx Transaction.
      * @param wr Request writer function.
      * @param handler Request handler.
+     * @param preferred_node_name Name of preferred node.
      * @return A connection used to perform request and the request ID.
      */
-    std::pair<std::shared_ptr<node_connection>, std::int64_t> 
perform_request_handler(const operation_function_type &op_func,
-        transaction_impl *tx, const writer_function_type &wr, const 
std::shared_ptr<response_handler> &handler);
+    std::pair<std::shared_ptr<node_connection>, std::int64_t> 
perform_request_handler(
+        const operation_function_type &op_func,
+        transaction_impl *tx,
+        const writer_function_type &wr,
+        const std::shared_ptr<response_handler> &handler,
+        const std::optional<std::string>& preferred_node_name = std::nullopt);
 
     /**
      * Perform request raw.
@@ -119,9 +124,14 @@ public:
      * @param tx Transaction.
      * @param wr Request writer function.
      * @param callback Callback to call on a result.
+     * @param preferred_node_name Name of preferred node.
      */
-    void perform_request_raw(protocol::client_operation op, transaction_impl 
*tx,
-        const writer_function_type &wr, ignite_callback<bytes_view> callback);
+    void perform_request_raw(
+        protocol::client_operation op,
+        transaction_impl *tx,
+        const writer_function_type &wr,
+        ignite_callback<bytes_view> callback,
+        const std::optional<std::string>& preferred_node_name = std::nullopt);
 
     /**
      * Perform request raw.
@@ -151,13 +161,15 @@ public:
      * @param wr Request writer function.
      * @param rd Response reader function.
      * @param callback Callback to call on a result.
+     * @param preferred_node_name Name of preferred node.
      */
     template<typename T>
     std::pair<std::shared_ptr<node_connection>, std::int64_t> 
perform_request(protocol::client_operation op,
         transaction_impl *tx, const writer_function_type &wr,
-        reader_function_type<T> rd, ignite_callback<T> callback) {
+        reader_function_type<T> rd, ignite_callback<T> callback,
+        const std::optional<std::string>& preferred_node_name = std::nullopt) {
         auto handler = 
std::make_shared<response_handler_reader<T>>(std::move(rd), 
std::move(callback));
-        return perform_request_handler(static_op(op), tx, wr, 
std::move(handler));
+        return perform_request_handler(static_op(op), tx, wr, 
std::move(handler), preferred_node_name);
     }
 
     /**
@@ -189,7 +201,7 @@ public:
     void perform_request(protocol::client_operation op, const 
writer_function_type &wr,
         std::function<T(protocol::reader &, std::shared_ptr<node_connection>)> 
rd, ignite_callback<T> callback) {
         auto handler = 
std::make_shared<response_handler_reader_connection<T>>(std::move(rd), 
std::move(callback));
-        perform_request_handler(static_op(op), nullptr, wr, 
std::move(handler));
+        perform_request_handler(static_op(op), nullptr, wr, 
std::move(handler), std::nullopt);
     }
 
     /**
@@ -261,9 +273,13 @@ public:
      * @return A connection used to perform request and the request ID.
      */
     template<typename T>
-    std::pair<std::shared_ptr<node_connection>, std::int64_t> 
perform_request_wr(protocol::client_operation op,
-        transaction_impl *tx, const writer_function_type &wr, 
ignite_callback<T> callback) {
-        return perform_request<T>(op, tx, wr, [](protocol::reader &) {}, 
std::move(callback));
+    std::pair<std::shared_ptr<node_connection>, std::int64_t> 
perform_request_wr(
+        protocol::client_operation op,
+        transaction_impl *tx,
+        const writer_function_type &wr,
+        ignite_callback<T> callback,
+        const std::optional<std::string>& preferred_node_name = std::nullopt) {
+        return perform_request<T>(op, tx, wr, [](protocol::reader &) {}, 
std::move(callback), preferred_node_name);
     }
 
     /**
@@ -273,6 +289,20 @@ public:
      */
     std::int64_t get_observable_timestamp() const { return 
m_observable_timestamp.load(); }
 
+    /**
+     * Get assignment timestamp.
+     *
+     * @return Assignment timestamp.
+     */
+    std::int64_t get_assignment_timestamp() const { return 
m_assignment_timestamp.load(); }
+
+    /**
+     * Get logger.
+     *
+     * @return Logger.
+     */
+    [[nodiscard]] std::shared_ptr<ignite_logger> get_logger() const { return 
m_logger; }
+
     /**
      * @param op Operation code to return.
      * @return A function that always returns the same operation.
@@ -289,6 +319,14 @@ private:
      */
     std::shared_ptr<node_connection> get_random_connected_channel();
 
+    /**
+     * Get connection according to provided preference otherwise returns 
random node connection.
+     *
+     * @param preferred_node_name Name of preferred node.
+     * @return Node connection.
+     */
+    std::shared_ptr<node_connection> get_channel(const 
std::optional<std::string> &preferred_node_name);
+
     /**
      * Constructor.
      *
@@ -342,6 +380,13 @@ private:
      */
     void on_observable_timestamp_changed(std::int64_t timestamp) override;
 
+    /**
+     * Handle partition assignment change.
+     *
+     * @param timestamp Assignment timestamp.
+     */
+    void on_partition_assignment_changed(std::int64_t timestamp) override;
+
     /**
      * Remove client.
      *
@@ -404,6 +449,9 @@ private:
     /** Observable timestamp. */
     std::atomic_int64_t m_observable_timestamp{0};
 
+    /** Partition assignment timestamp. */
+    std::atomic_int64_t m_assignment_timestamp{0};
+
     /** Timer thread. */
     std::shared_ptr<thread_timer> m_timer_thread;
 };
diff --git 
a/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h 
b/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
index d8e5322ccca..50ed06dce79 100644
--- a/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
+++ b/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
@@ -44,6 +44,13 @@ public:
      * @param timestamp Timestamp.
      */
     virtual void on_observable_timestamp_changed(std::int64_t timestamp) = 0;
+
+    /**
+     * Handle partition assignment change.
+     *
+     * @param timestamp Assignment timestamp.
+     */
+    virtual void on_partition_assignment_changed(std::int64_t timestamp) = 0;
 };
 
 } // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp 
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index 6881ee16a68..6e2219c0604 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -69,9 +69,12 @@ void node_connection::process_message(bytes_view msg) {
 
     auto req_id = reader.read_int64();
     auto flags = reader.read_int32();
+    auto event_handler = m_event_handler.lock();
     if (test_flag(flags, 
protocol::response_flag::PARTITION_ASSIGNMENT_CHANGED)) {
         auto assignment_ts = reader.read_int64();
-        UNUSED_VALUE assignment_ts;
+        if (event_handler) {
+            event_handler->on_partition_assignment_changed(assignment_ts);
+        }
     }
 
     auto observable_timestamp = reader.read_int64();
@@ -180,6 +183,8 @@ ignite_result<void> 
node_connection::process_handshake_rsp(bytes_view msg) {
         std::chrono::milliseconds(response.idle_timeout_ms));
 
     m_protocol_context = response.context;
+    m_node_id = response.node_id;
+    m_node_name = response.node_name;
     m_handshake_complete = true;
 
     if (m_heartbeat_interval.count()) {
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.h 
b/modules/platforms/cpp/ignite/client/detail/node_connection.h
index 5e130e37829..f6146cb88cf 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.h
@@ -232,8 +232,20 @@ public:
      */
     std::shared_ptr<ignite_logger> get_logger() const { return m_logger; }
 
+    /**
+     * Cancels waiting for over-due responses.
+     */
     void handle_timeouts();
 
+    /**
+     * Name of the node this connection is tethered to.
+     *
+     * @return Name of the node.
+     */
+    const std::string& get_node_name() const {
+        return m_node_name;
+    }
+
 private:
     /**
      * Constructor.
@@ -334,6 +346,12 @@ private:
 
     /** Timer thread. */
     std::weak_ptr<thread_timer> m_timer_thread;
+
+    /** Node id. */
+    uuid m_node_id{};
+
+    /** Name of the node this connection is tethered to. */
+    std::string m_node_name{};
 };
 
 } // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/table/schema.h 
b/modules/platforms/cpp/ignite/client/detail/table/schema.h
index 71be10ef565..0d8a0c8ad80 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/schema.h
+++ b/modules/platforms/cpp/ignite/client/detail/table/schema.h
@@ -79,6 +79,7 @@ struct schema {
     const std::vector<column> columns;
     const std::vector<const column *> key_columns;
     const std::vector<const column *> val_columns;
+    const std::vector<const column *> collocated_columns;
 
     // Default
     schema() = default;
@@ -86,17 +87,23 @@ struct schema {
     /**
      * Constructor.
      *
-     * @param version Version.
-     * @param columns Columns.
-     * @param key_columns Key Columns.
-     * @param val_columns Value Columns.
+     * @param version Schema version.
+     * @param columns All columns.
+     * @param key_columns Key columns.
+     * @param val_columns Value columns.
+     * @param collocated_columns Collocated columns.
      */
-    schema(std::int32_t version, std::vector<column> &&columns, 
std::vector<const column *> &&key_columns,
-        std::vector<const column *> &&val_columns)
+    schema(
+        std::int32_t version,
+        std::vector<column> &&columns,
+        std::vector<const column *> &&key_columns,
+        std::vector<const column *> &&val_columns,
+        std::vector<const column *> &&collocated_columns)
         : version(version)
         , columns(std::move(columns))
         , key_columns(std::move(key_columns))
-        , val_columns(std::move(val_columns)) {}
+        , val_columns(std::move(val_columns))
+        , collocated_columns(std::move(collocated_columns)) {}
 
     /**
      * Get column by index.
@@ -118,30 +125,52 @@ struct schema {
      * @return A new schema instance.
      */
     static std::shared_ptr<schema> create_instance(std::int32_t version, 
std::vector<column> &&cols) {
-        std::int32_t key_columns_cnt = 0;
+        std::size_t key_columns_cnt = 0;
+        std::size_t collocated_columns_cnt = 0;
         for (const auto &column : cols) {
             if (column.is_key())
                 ++key_columns_cnt;
+
+            if (column.colocation_index >= 0)
+                ++collocated_columns_cnt;
         }
-        std::int32_t val_columns_cnt = std::int32_t(cols.size()) - 
key_columns_cnt;
+
+        const size_t val_columns_cnt = cols.size() - key_columns_cnt;
 
         std::vector<const column *> key_columns(key_columns_cnt, nullptr);
 
         std::vector<const column *> val_columns;
         val_columns.reserve(val_columns_cnt);
 
+        assert(collocated_columns_cnt <= key_columns_cnt);
+
+        std::vector<const column *> collocated_columns(collocated_columns_cnt, 
nullptr);
+
         for (const auto &column : cols) {
             if (column.is_key()) {
-                assert(column.key_index >= 0 && std::size_t(column.key_index) 
< key_columns.size());
+                assert(column.key_index >= 0 && 
static_cast<std::size_t>(column.key_index) < key_columns.size());
                 assert(key_columns[column.key_index] == nullptr);
 
                 key_columns[column.key_index] = &column;
             } else {
                 val_columns.push_back(&column);
             }
+
+            if (column.colocation_index >= 0) {
+                assert(static_cast<std::size_t>(column.colocation_index) < 
collocated_columns.size());
+                assert(collocated_columns[column.colocation_index] == nullptr);
+
+                collocated_columns[column.colocation_index] = &column;
+            }
         }
 
-        return std::make_shared<schema>(version, std::move(cols), 
std::move(key_columns), std::move(val_columns));
+        return std::make_shared<schema>(
+            version,
+            std::move(cols),
+            std::move(key_columns),
+            std::move(val_columns),
+            std::move(collocated_columns)
+        );
     }
 
     /**
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp 
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
index 19c5059c659..d0b35daf66f 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
@@ -17,6 +17,7 @@
 
 #include "table_impl.h"
 
+#include "detail/hash_calculator.h"
 #include "ignite/client/detail/transaction/transaction_impl.h"
 #include "ignite/client/detail/utils.h"
 #include "ignite/client/table/table.h"
@@ -149,6 +150,8 @@ void table_impl::load_schema_async(
 void table_impl::get_async(
     transaction *tx, const ignite_tuple &key, 
ignite_callback<std::optional<ignite_tuple>> callback) {
 
+    update_partition_assignment();
+
     with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
         [self = shared_from_this(), key = std::make_shared<ignite_tuple>(key), 
tx0 = to_impl(tx)](
             const schema &sch, auto callback) mutable {
@@ -165,13 +168,22 @@ void table_impl::get_async(
                     callback(read_tuple_opt(reader, &sch));
                 });
 
+            auto preferred_node = self->get_preferred_node_name(*key, sch);
+
             self->m_connection->perform_request_raw(
-                protocol::client_operation::TUPLE_GET, tx0.get(), writer_func, 
std::move(handle_func));
+                protocol::client_operation::TUPLE_GET,
+                tx0.get(),
+                writer_func,
+                std::move(handle_func),
+                preferred_node
+            );
         });
 }
 
 void table_impl::contains_async(transaction *tx, const ignite_tuple &key, 
ignite_callback<bool> callback) {
 
+    update_partition_assignment();
+
     with_proper_schema_async<bool>(std::move(callback),
         [self = shared_from_this(), key = std::make_shared<ignite_tuple>(key), 
tx0 = to_impl(tx)](
             const schema &sch, auto callback) mutable {
@@ -186,8 +198,16 @@ void table_impl::contains_async(transaction *tx, const 
ignite_tuple &key, ignite
                 return reader.read_bool();
             };
 
-            
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_CONTAINS_KEY,
 tx0.get(),
-                writer_func, std::move(reader_func), std::move(callback));
+            auto preferred_node = self->get_preferred_node_name(*key, sch);
+
+            self->m_connection->perform_request<bool>(
+                protocol::client_operation::TUPLE_CONTAINS_KEY,
+                tx0.get(),
+                writer_func,
+                std::move(reader_func),
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
@@ -213,6 +233,9 @@ void table_impl::get_all_async(transaction *tx, 
std::vector<ignite_tuple> keys,
 }
 
 void table_impl::upsert_async(transaction *tx, const ignite_tuple &record, 
ignite_callback<void> callback) {
+
+    update_partition_assignment();
+
     with_proper_schema_async<void>(std::move(callback),
         [self = shared_from_this(), record = ignite_tuple(record), tx0 = 
to_impl(tx)](
             const schema &sch, auto callback) mutable {
@@ -221,8 +244,15 @@ void table_impl::upsert_async(transaction *tx, const 
ignite_tuple &record, ignit
                 write_tuple(writer, sch, record, false);
             };
 
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
             self->m_connection->perform_request_wr(
-                protocol::client_operation::TUPLE_UPSERT, tx0.get(), 
writer_func, std::move(callback));
+                protocol::client_operation::TUPLE_UPSERT,
+                tx0.get(),
+                writer_func,
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
@@ -244,12 +274,14 @@ void table_impl::upsert_all_async(transaction *tx, 
std::vector<ignite_tuple> rec
 void table_impl::get_and_upsert_async(
     transaction *tx, const ignite_tuple &record, 
ignite_callback<std::optional<ignite_tuple>> callback) {
 
+    update_partition_assignment();
+
     with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
-        [self = shared_from_this(), record = 
std::make_shared<ignite_tuple>(record), tx0 = to_impl(tx)](
+        [self = shared_from_this(), record, tx0 = to_impl(tx)](
             const schema &sch, auto callback) {
             auto writer_func = [self, record, &sch, &tx0](protocol::writer 
&writer, auto&) {
                 write_table_operation_header(writer, self->m_id, tx0.get(), 
sch);
-                write_tuple(writer, sch, *record, false);
+                write_tuple(writer, sch, record, false);
             };
 
             auto handle_func = 
make_schema_handler_function<std::optional<ignite_tuple>>(
@@ -257,12 +289,22 @@ void table_impl::get_and_upsert_async(
                     callback(read_tuple_opt(reader, &sch));
                 });
 
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
             self->m_connection->perform_request_raw(
-                protocol::client_operation::TUPLE_GET_AND_UPSERT, tx0.get(), 
writer_func, std::move(handle_func));
+                protocol::client_operation::TUPLE_GET_AND_UPSERT,
+                tx0.get(),
+                writer_func,
+                std::move(handle_func),
+                preferred_node
+            );
         });
 }
 
 void table_impl::insert_async(transaction *tx, const ignite_tuple &record, 
ignite_callback<bool> callback) {
+
+    update_partition_assignment();
+
     with_proper_schema_async<bool>(std::move(callback),
         [self = shared_from_this(), record = ignite_tuple(record), tx0 = 
to_impl(tx)](
             const schema &sch, auto callback) mutable {
@@ -277,8 +319,16 @@ void table_impl::insert_async(transaction *tx, const 
ignite_tuple &record, ignit
                 return reader.read_bool();
             };
 
-            
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_INSERT,
 tx0.get(), writer_func,
-                std::move(reader_func), std::move(callback));
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
+            self->m_connection->perform_request<bool>(
+                protocol::client_operation::TUPLE_INSERT,
+                tx0.get(),
+                writer_func,
+                std::move(reader_func),
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
@@ -305,10 +355,13 @@ void table_impl::insert_all_async(
 }
 
 void table_impl::replace_async(transaction *tx, const ignite_tuple &record, 
ignite_callback<bool> callback) {
+
+    update_partition_assignment();
+
     with_proper_schema_async<bool>(std::move(callback),
         [self = shared_from_this(), record = ignite_tuple(record), tx0 = 
to_impl(tx)](
             const schema &sch, auto callback) mutable {
-            auto writer_func = [self, &record, &sch, &tx0](protocol::writer 
&writer, auto&) {
+            auto writer_func = [self, &record, &sch, &tx0](protocol::writer 
&writer, auto &) {
                 write_table_operation_header(writer, self->m_id, tx0.get(), 
sch);
                 write_tuple(writer, sch, record, false);
             };
@@ -319,13 +372,24 @@ void table_impl::replace_async(transaction *tx, const 
ignite_tuple &record, igni
                 return reader.read_bool();
             };
 
-            
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_REPLACE,
 tx0.get(), writer_func,
-                std::move(reader_func), std::move(callback));
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
+            self->m_connection->perform_request<bool>(
+                protocol::client_operation::TUPLE_REPLACE,
+                tx0.get(),
+                writer_func,
+                std::move(reader_func),
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
 void table_impl::replace_async(
     transaction *tx, const ignite_tuple &record, const ignite_tuple 
&new_record, ignite_callback<bool> callback) {
+
+    update_partition_assignment();
+
     with_proper_schema_async<bool>(std::move(callback),
         [self = shared_from_this(), record = ignite_tuple(record), new_record 
= ignite_tuple(new_record),
             tx0 = to_impl(tx)](const schema &sch, auto callback) mutable {
@@ -341,20 +405,30 @@ void table_impl::replace_async(
                 return reader.read_bool();
             };
 
-            
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_REPLACE_EXACT,
 tx0.get(),
-                writer_func, std::move(reader_func), std::move(callback));
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
+            self->m_connection->perform_request<bool>(
+                protocol::client_operation::TUPLE_REPLACE_EXACT,
+                tx0.get(),
+                writer_func,
+                std::move(reader_func),
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
 void table_impl::get_and_replace_async(
     transaction *tx, const ignite_tuple &record, 
ignite_callback<std::optional<ignite_tuple>> callback) {
 
+    update_partition_assignment();
+
     with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
-        [self = shared_from_this(), record = 
std::make_shared<ignite_tuple>(record), tx0 = to_impl(tx)](
+        [self = shared_from_this(), record, tx0 = to_impl(tx)](
             const schema &sch, auto callback) mutable {
             auto writer_func = [self, record, &sch, &tx0](protocol::writer 
&writer, auto&) {
                 write_table_operation_header(writer, self->m_id, tx0.get(), 
sch);
-                write_tuple(writer, sch, *record, false);
+                write_tuple(writer, sch, record, false);
             };
 
             auto handle_func = 
make_schema_handler_function<std::optional<ignite_tuple>>(
@@ -362,12 +436,22 @@ void table_impl::get_and_replace_async(
                     callback(read_tuple_opt(reader, &sch));
                 });
 
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
             self->m_connection->perform_request_raw(
-                protocol::client_operation::TUPLE_GET_AND_REPLACE, tx0.get(), 
writer_func, std::move(handle_func));
+                protocol::client_operation::TUPLE_GET_AND_REPLACE,
+                tx0.get(),
+                writer_func,
+                std::move(handle_func),
+                preferred_node
+            );
         });
 }
 
 void table_impl::remove_async(transaction *tx, const ignite_tuple &key, 
ignite_callback<bool> callback) {
+
+    update_partition_assignment();
+
     with_proper_schema_async<bool>(std::move(callback),
         [self = shared_from_this(), record = ignite_tuple(key), tx0 = 
to_impl(tx)](
             const schema &sch, auto callback) mutable {
@@ -382,12 +466,23 @@ void table_impl::remove_async(transaction *tx, const 
ignite_tuple &key, ignite_c
                 return reader.read_bool();
             };
 
-            
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_DELETE,
 tx0.get(), writer_func,
-                std::move(reader_func), std::move(callback));
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
+            self->m_connection->perform_request<bool>(
+                protocol::client_operation::TUPLE_DELETE,
+                tx0.get(),
+                writer_func,
+                std::move(reader_func),
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
 void table_impl::remove_exact_async(transaction *tx, const ignite_tuple 
&record, ignite_callback<bool> callback) {
+
+    update_partition_assignment();
+
     with_proper_schema_async<bool>(std::move(callback),
         [self = shared_from_this(), record = ignite_tuple(record), tx0 = 
to_impl(tx)](
             const schema &sch, auto callback) mutable {
@@ -402,20 +497,30 @@ void table_impl::remove_exact_async(transaction *tx, 
const ignite_tuple &record,
                 return reader.read_bool();
             };
 
-            
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_DELETE_EXACT,
 tx0.get(),
-                writer_func, std::move(reader_func), std::move(callback));
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
+            self->m_connection->perform_request<bool>(
+                protocol::client_operation::TUPLE_DELETE_EXACT,
+                tx0.get(),
+                writer_func,
+                std::move(reader_func),
+                std::move(callback),
+                preferred_node
+            );
         });
 }
 
 void table_impl::get_and_remove_async(
     transaction *tx, const ignite_tuple &key, 
ignite_callback<std::optional<ignite_tuple>> callback) {
 
+    update_partition_assignment();
+
     with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
-        [self = shared_from_this(), record = 
std::make_shared<ignite_tuple>(key), tx0 = to_impl(tx)](
+        [self = shared_from_this(), record = key, tx0 = to_impl(tx)](
             const schema &sch, auto callback) mutable {
             auto writer_func = [self, record, &sch, &tx0](protocol::writer 
&writer, auto&) {
                 write_table_operation_header(writer, self->m_id, tx0.get(), 
sch);
-                write_tuple(writer, sch, *record, true);
+                write_tuple(writer, sch, record, true);
             };
 
             auto handle_func = 
make_schema_handler_function<std::optional<ignite_tuple>>(
@@ -423,8 +528,15 @@ void table_impl::get_and_remove_async(
                     callback(read_tuple_opt(reader, &sch));
                 });
 
+            auto preferred_node = self->get_preferred_node_name(record, sch);
+
             self->m_connection->perform_request_raw(
-                protocol::client_operation::TUPLE_GET_AND_DELETE, tx0.get(), 
writer_func, std::move(handle_func));
+                protocol::client_operation::TUPLE_GET_AND_DELETE,
+                tx0.get(),
+                writer_func,
+                std::move(handle_func),
+                preferred_node
+            );
         });
 }
 
@@ -472,4 +584,68 @@ std::shared_ptr<table_impl> table_impl::from_facade(table 
&tb) {
     return tb.m_impl;
 }
 
+void table_impl::update_partition_assignment() {
+    ignite_callback<std::shared_ptr<protocol::partition_assignment>> callback 
= [self=shared_from_this()](auto pa) {
+        if (pa.has_error()) {
+            self->m_connection->get_logger()->log_warning("Error while 
updating partition assignment for table '"
+            + self->get_name() + "': " + pa.error().what_str());
+
+            return;
+        }
+
+        std::lock_guard lock(self->m_partitions_mutex);
+        self->m_partition_assignment = std::move(pa).value();
+    };
+
+    load_partition_assignment_async(std::move(callback));
+}
+
+void 
table_impl::load_partition_assignment_async(ignite_callback<std::shared_ptr<protocol::partition_assignment>>
 callback) {
+    std::int64_t timestamp = m_connection->get_assignment_timestamp();
+
+    auto pa = get_partition_assignment();
+    if (pa && !pa->is_outdated(timestamp)) {
+        m_connection->get_logger()->log_debug("Partition assignment for table 
" + get_name() + " is up to date.");
+
+        callback(std::move(pa));
+        return;
+    }
+
+    auto writer_func = [id = m_id, timestamp](protocol::writer &writer, auto&) 
{
+        protocol::write_partition_assignment_request(writer, id, timestamp);
+    };
+
+    auto reader_func = [timestamp](protocol::reader &reader) -> 
std::shared_ptr<protocol::partition_assignment> {
+        return protocol::read_partition_assignment_response(reader, timestamp);
+    };
+
+    
m_connection->perform_request<std::shared_ptr<protocol::partition_assignment>>(
+        protocol::client_operation::PARTITION_ASSIGNMENT_GET,
+        nullptr,
+        writer_func,
+        std::move(reader_func),
+        std::move(callback));
+}
+
+std::optional<std::string> table_impl::get_preferred_node_name(const 
ignite_tuple &key_or_rec, const schema &sch) {
+    auto pa = get_partition_assignment();
+
+    if (!pa || pa->get_partitions().empty()) {
+        m_connection->get_logger()->log_debug("No partition distribution 
available, a random node will be called");
+        return {};
+    }
+
+    hash_calculator hc;
+    for (auto column : sch.collocated_columns) {
+        const auto& val = key_or_rec.get(column->key_index);
+        hc.append(val, column->scale, column->precision);
+    }
+
+    auto hash = hc.result_hash();
+
+    auto part_id = std::abs(hash % 
static_cast<int32_t>(pa->get_partitions().size()));
+
+    return pa->get_partitions()[part_id];
+}
+
 } // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h 
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
index f99adca4640..47a84280553 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
@@ -23,6 +23,8 @@
 #include "ignite/client/table/qualified_name.h"
 #include "ignite/client/transaction/transaction.h"
 
+#include "ignite/protocol/partition_assignment.h"
+
 #include <memory>
 #include <mutex>
 #include <unordered_map>
@@ -364,6 +366,7 @@ public:
      * @return Implementation.
      */
     [[nodiscard]] static std::shared_ptr<table_impl> from_facade(table &tb);
+    void update_partition_assignment();
 
     /**
      * Get table ID.
@@ -409,6 +412,32 @@ private:
         m_schemas[val->version] = val;
     }
 
+    /**
+     * Get partition assignment.
+     *
+     * @return Partition assignment.
+     */
+    std::shared_ptr<protocol::partition_assignment> get_partition_assignment() 
{
+        std::lock_guard lock(m_partitions_mutex);
+        auto assignment = m_partition_assignment;
+        return assignment;
+    }
+
+    /**
+     * Load partition assignment.
+     *
+     * @param callback Callback to call with the actual assignment.
+     */
+    void 
load_partition_assignment_async(ignite_callback<std::shared_ptr<protocol::partition_assignment>>
 callback);
+
+    /**
+     * Returns name of node which should contain primary replica for partition 
record is belongs to.
+     * @param key_or_rec Key part of record or whole record.
+     * @param sch Schema
+     * @return Node name with replica or @c std::nullopt.
+     */
+    std::optional<std::string> get_preferred_node_name(const ignite_tuple 
&key_or_rec, const schema &sch);
+
     /**
      * Get impl of transaction.
      * @param tx Transaction.
@@ -433,6 +462,12 @@ private:
 
     /** Schemas. */
     std::unordered_map<int32_t, std::shared_ptr<schema>> m_schemas;
+
+    /** Partitions mutex. */
+    std::recursive_mutex m_partitions_mutex;
+
+    /** Partition assignment. */
+    std::shared_ptr<protocol::partition_assignment> m_partition_assignment;
 };
 
 } // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/common/uuid.cpp 
b/modules/platforms/cpp/ignite/common/uuid.cpp
index 22bd65d0fa8..5038dbf37e3 100644
--- a/modules/platforms/cpp/ignite/common/uuid.cpp
+++ b/modules/platforms/cpp/ignite/common/uuid.cpp
@@ -22,6 +22,58 @@
 
 namespace ignite {
 
+std::optional<uuid> uuid::from_string(const std::string &text) {
+    if (text.length() != 36) {
+        return {};
+    }
+
+    auto str = text.c_str();
+
+    auto parse_chunk = [str](size_t beg, size_t end, uint64_t &out) -> bool {
+        char *p;
+
+        if (errno != 0) {
+            errno = 0;
+        }
+
+        out = std::strtoull(str + beg, &p, 16);
+
+        if (p != str + end || (*p != '-' && *p != '\0') || errno == ERANGE) {
+            return false;
+        }
+
+        return true;
+    };
+
+    std::uint64_t msb1, msb2, msb3;
+    std::uint64_t lsb1, lsb2;
+
+    if (!parse_chunk(0, 8, msb1)) {
+        return {};
+    }
+
+    if (!parse_chunk(9, 13, msb2)) {
+        return {};
+    }
+
+    if (!parse_chunk(14, 18, msb3)) {
+        return {};
+    }
+
+    if (!parse_chunk(19, 23, lsb1)) {
+        return {};
+    }
+
+    if (!parse_chunk(24, 36, lsb2)) {
+        return {};
+    }
+
+    uint64_t msb = msb1 << 32 | msb2 << 16 | msb3;
+    uint64_t lsb = lsb1 << 48 | lsb2;
+
+    return uuid{static_cast<int64_t>(msb), static_cast<int64_t>(lsb)};
+}
+
 uuid uuid::random() {
     static std::mutex random_mutex;
     static std::random_device rd;
diff --git a/modules/platforms/cpp/ignite/common/uuid.h 
b/modules/platforms/cpp/ignite/common/uuid.h
index abb01a874ec..b81710bb858 100644
--- a/modules/platforms/cpp/ignite/common/uuid.h
+++ b/modules/platforms/cpp/ignite/common/uuid.h
@@ -17,10 +17,14 @@
 
 #pragma once
 
+#include <cerrno>
 #include <cstdint>
+#include <cstdlib>
 #include <iomanip>
 #include <istream>
+#include <optional>
 #include <ostream>
+#include <string>
 
 namespace ignite {
 
@@ -46,6 +50,15 @@ public:
         : most(most)
         , least(least) {}
 
+    /**
+     * Parses string-encoded uuid.
+     *
+     * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
+     * @param text String-encoded uuid.
+     * @return @c uuid object if parsing was successful, otherwise empty 
optional.
+     */
+    static std::optional<uuid> from_string(const std::string& text);
+
     /**
      * Make random UUID.
      *
diff --git a/modules/platforms/cpp/ignite/common/uuid_test.cpp 
b/modules/platforms/cpp/ignite/common/uuid_test.cpp
index 5770daccc62..25017c59954 100644
--- a/modules/platforms/cpp/ignite/common/uuid_test.cpp
+++ b/modules/platforms/cpp/ignite/common/uuid_test.cpp
@@ -64,3 +64,81 @@ TEST(uuid, stream) {
 
     EXPECT_EQ(uuidString, uuidString2);
 }
+
+TEST(uuid, incorrect_string_representation) {
+    auto uuidText = "foo";
+
+    auto resOpt = ignite::uuid::from_string(uuidText);
+
+    EXPECT_FALSE(resOpt.has_value());
+}
+
+class uuid_string_presentation_fixture: public 
::testing::TestWithParam<std::tuple<std::string, ignite::uuid>> {};
+
+TEST_P(uuid_string_presentation_fixture, from_string) {
+    auto [uuidText, uuid] = GetParam();
+
+    auto resOpt = ignite::uuid::from_string(uuidText);
+
+    ASSERT_TRUE(resOpt.has_value());
+
+    EXPECT_EQ(uuid.get_most_significant_bits(), 
resOpt->get_most_significant_bits());
+    EXPECT_EQ(uuid.get_least_significant_bits(), 
resOpt->get_least_significant_bits());
+}
+
+// We check that two representation are consistent both ways.
+TEST_P(uuid_string_presentation_fixture, to_string) {
+    auto [uuidText, uuid] = GetParam();
+
+    std::stringstream ss;
+    ss << uuid;
+
+    EXPECT_EQ(uuidText, ss.str());
+}
+
+TEST_P(uuid_string_presentation_fixture, circular_convertation0) {
+    auto [uuidText, uuid] = GetParam();
+
+    std::stringstream ss;
+    ss << uuid;
+
+    auto convertedText = ss.str();
+
+    auto converterUuid = ignite::uuid::from_string(convertedText);
+
+    EXPECT_EQ(converterUuid, uuid);
+}
+
+TEST_P(uuid_string_presentation_fixture, circular_convertation1) {
+    auto [uuidText, uuid] = GetParam();
+
+    auto converterUuid = ignite::uuid::from_string(uuidText);
+
+    ASSERT_TRUE(converterUuid.has_value());
+
+    std::stringstream ss;
+    ss << *converterUuid;
+
+    auto convertedText = ss.str();
+
+    EXPECT_EQ(convertedText, uuidText);
+}
+
+// Values has taken randomly from java program, as this implementation should 
have same behaviour as java.lang.UUID for compatibility reasons.
+INSTANTIATE_TEST_SUITE_P(
+    various_uuids,
+    uuid_string_presentation_fixture,
+    ::testing::Values(
+        std::tuple{"4b62e46a-d380-460f-94ea-9b4320752634", 
ignite::uuid{5432155248028304911LL, -7716184298936261068LL}},
+        std::tuple{"cabf7114-7763-4ad5-b312-5ffadcbf48c8", 
ignite::uuid{-3837224024780092715, -5543262660289673016}},
+        std::tuple{"8ee62c79-5636-44b9-adc4-c9b4272a2ba6", 
ignite::uuid{-8149777576031271751, -5925389434124358746}},
+        std::tuple{"3e69a946-fc2a-4da1-9938-d1f082dd36ce", 
ignite::uuid{4497311825249586593, -7405938756292888882}},
+        std::tuple{"7cc9f130-821b-4580-bfda-955072540643", 
ignite::uuid{8991983321665455488, -4622217894794361277}},
+        std::tuple{"1f86b0b1-dfab-4108-8b7e-8b3e4177ceb0", 
ignite::uuid{2271697340063236360, -8395119555869421904}},
+        std::tuple{"88cc69d9-d191-482d-b295-aa458cf99167", 
ignite::uuid{-8589374005057599443, -5578365347733859993}},
+        std::tuple{"a8b6298c-126e-4c4d-a521-de3989a31492", 
ignite::uuid{-6289794147994940339, -6547708044516322158}},
+        std::tuple{"03b210a8-5a6a-4cae-b4fc-f30df9f1eebd", 
ignite::uuid{266293643225746606, -5405178211397931331}},
+        std::tuple{"f145e277-bb58-4f89-adfa-3166a975b71b", 
ignite::uuid{-1061193133303771255, -5910357243970865381}},
+        std::tuple{"dc8b5857-067f-486e-aa5b-52cbdb26dd9c", 
ignite::uuid{-2554851232808220562, -6171247828872536676}}
+    )
+);
\ No newline at end of file
diff --git a/modules/platforms/cpp/ignite/network/length_prefix_codec.h 
b/modules/platforms/cpp/ignite/network/length_prefix_codec.h
index 775addf5031..3536b21d69f 100644
--- a/modules/platforms/cpp/ignite/network/length_prefix_codec.h
+++ b/modules/platforms/cpp/ignite/network/length_prefix_codec.h
@@ -19,6 +19,7 @@
 
 #include <ignite/common/ignite_error.h>
 #include <ignite/network/codec.h>
+#include <ignite/common/detail/factory.h>
 
 #include <cstddef>
 #include <vector>
@@ -83,6 +84,6 @@ private:
 };
 
 /** Factory for length_prefix_codec. */
-typedef detail::basic_factory<codec, length_prefix_codec> 
length_prefix_codec_factory;
+typedef ignite::detail::basic_factory<codec, length_prefix_codec> 
length_prefix_codec_factory;
 
 } // namespace ignite::network
diff --git a/modules/platforms/cpp/ignite/protocol/client_operation.h 
b/modules/platforms/cpp/ignite/protocol/client_operation.h
index d54974d91a3..e1b0cba5fa4 100644
--- a/modules/platforms/cpp/ignite/protocol/client_operation.h
+++ b/modules/platforms/cpp/ignite/protocol/client_operation.h
@@ -119,6 +119,9 @@ enum class client_operation {
     /** Close the cursor. */
     SQL_CURSOR_CLOSE = 52,
 
+    /** Get partition assignment. */
+    PARTITION_ASSIGNMENT_GET = 53,
+
     /** Execute SQL script. */
     SQL_EXEC_SCRIPT = 56,
 
diff --git a/modules/platforms/cpp/ignite/protocol/messages.cpp 
b/modules/platforms/cpp/ignite/protocol/messages.cpp
index 41857115d3a..c9442ec321c 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.cpp
+++ b/modules/platforms/cpp/ignite/protocol/messages.cpp
@@ -64,8 +64,8 @@ handshake_response parse_handshake_response(bytes_view 
message) {
         return res;
 
     res.idle_timeout_ms = reader.read_int64();
-    reader.skip(); // Cluster node ID. Needed for partition-aware compute.
-    UNUSED_VALUE reader.read_string_nullable(); // Cluster node name. Needed 
for partition-aware compute.
+    res.node_id = reader.read_uuid();
+    res.node_name = reader.read_string();
 
     auto cluster_ids_len = reader.read_int32();
     if (cluster_ids_len <= 0) {
@@ -100,4 +100,39 @@ handshake_response parse_handshake_response(bytes_view 
message) {
     return res;
 }
 
+void write_partition_assignment_request(writer &writer, std::int32_t table_id, 
std::int64_t timestamp) {
+    writer.write(table_id);
+    writer.write(timestamp);
+}
+
+std::shared_ptr<partition_assignment> 
read_partition_assignment_response(reader &reader, std::int64_t timestamp) {
+    auto cnt = reader.read_int32();
+    if (cnt <= 0)
+        throw ignite_error("Invalid partition count: " + std::to_string(cnt));
+
+    std::vector<std::optional<std::string>> partitions;
+    partitions.reserve(cnt);
+
+    bool assignment_available = reader.read_bool();
+    if (!assignment_available) {
+        // Invalidate the current assignment so that we can retry on the next 
call.
+        // Return an empty array so that per-partition batches can be 
initialized.
+        // We'll get the actual assignment on the next call.
+        partitions.insert(partitions.end(), cnt, std::nullopt);
+        return std::make_shared<partition_assignment>(0, 
std::move(partitions));
+    }
+
+    // Returned timestamp can be newer than requested.
+    std::int64_t ts = reader.read_int64();
+    if (ts < timestamp)
+        throw ignite_error("Returned timestamp is older than requested: " + 
std::to_string(ts) + " < "
+            + std::to_string(timestamp));
+
+    for (std::int32_t i = 0; i < cnt; ++i) {
+        partitions.emplace_back(reader.read_string_nullable());
+    }
+
+    return std::make_shared<partition_assignment>(ts, std::move(partitions));
+}
+
 } // namespace ignite::protocol
diff --git a/modules/platforms/cpp/ignite/protocol/messages.h 
b/modules/platforms/cpp/ignite/protocol/messages.h
index 0a4829f941c..bb1df42785c 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.h
+++ b/modules/platforms/cpp/ignite/protocol/messages.h
@@ -19,6 +19,9 @@
 
 #include "ignite/protocol/protocol_context.h"
 #include "ignite/protocol/protocol_version.h"
+#include "ignite/protocol/writer.h"
+#include "ignite/protocol/reader.h"
+#include "ignite/protocol/partition_assignment.h"
 
 #include "ignite/common/bytes_view.h"
 #include "ignite/common/ignite_error.h"
@@ -66,10 +69,16 @@ struct handshake_response {
     protocol_context context{};
 
     /** Observable timestamp. */
-    std::int64_t observable_timestamp;
+    std::int64_t observable_timestamp{0};
 
     /** Server idle timeout in ms. */
-    std::int64_t idle_timeout_ms;
+    std::int64_t idle_timeout_ms{0};
+
+    /** Node id. */
+    uuid node_id{};
+
+    /** Node name. */
+    std::string node_name;
 };
 
 /**
@@ -91,4 +100,22 @@ std::vector<std::byte> make_handshake_request(
  */
 handshake_response parse_handshake_response(bytes_view message);
 
+/**
+ * Write partition assignment request.
+ *
+ * @param writer Writer.
+ * @param table_id Table ID.
+ * @param timestamp Timestamp.
+ */
+void write_partition_assignment_request(writer &writer, std::int32_t table_id, 
std::int64_t timestamp);
+
+/**
+ * Read assignment response.
+ *
+ * @param reader Reader.
+ * @param timestamp Timestamp.
+ * @return A new assignment.
+ */
+std::shared_ptr<partition_assignment> 
read_partition_assignment_response(reader &reader, std::int64_t timestamp);
+
 } // namespace ignite::protocol
diff --git a/modules/platforms/cpp/ignite/protocol/partition_assignment.h 
b/modules/platforms/cpp/ignite/protocol/partition_assignment.h
new file mode 100644
index 00000000000..a35d034b573
--- /dev/null
+++ b/modules/platforms/cpp/ignite/protocol/partition_assignment.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+class partition_assignment {
+public:
+    partition_assignment() = default;
+
+    /**
+     * Constructor.
+     *
+     * @param timestamp Partition assignment data.
+     * @param partitions Partition assignment encode into vector.
+     */
+    partition_assignment(std::int64_t timestamp, 
std::vector<std::optional<std::string>> partitions)
+        : m_timestamp(timestamp)
+        , m_partitions(std::move(partitions)) {}
+
+    /**
+     * Check whether the assignment is outdated.
+     *
+     * @param actual_timestamp Timestamp.
+     * @return @c true if assignment is outdated.
+     */
+    [[nodiscard]] bool is_outdated(std::int64_t actual_timestamp) const { 
return m_timestamp < actual_timestamp; }
+
+    /**
+     * Partitions. Vector is decoded as following: size of collection is total 
number of partitions,
+     * i-th value means that partition with id == i resides on (primary 
replica of that partition belongs to)
+     * node with provided consisted id (AKA node name).
+     *
+     * @return Vector with partition assignments.
+     */
+    [[nodiscard]] const std::vector<std::optional<std::string>>& 
get_partitions() const {
+        return m_partitions;
+    }
+private:
+    /** Assignment timestamp. */
+    std::int64_t m_timestamp{0};
+
+    /**
+    * Partitions. Vector is decoded as following: size of collection is total 
number of partitions,
+    * i-th value means that partition with id == i resides on (primary replica 
of that partition belongs to)
+    * node with provided consisted id (AKA node name).
+    */
+    std::vector<std::optional<std::string>> m_partitions;
+};
+
+} // namespace ignite::protocol
diff --git a/modules/platforms/cpp/tests/client-test/CMakeLists.txt 
b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
index 715c784bd80..3810b5442b3 100644
--- a/modules/platforms/cpp/tests/client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
@@ -33,6 +33,7 @@ set(SOURCES
     ssl_test.cpp
     tables_test.cpp
     transactions_test.cpp
+    partition_awareness_test.cpp
 )
 
 ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common 
ignite3-client)
diff --git a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h 
b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
index edc97733394..e09ffeb80c9 100644
--- a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
+++ b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
@@ -57,9 +57,11 @@ public:
     inline static const std::string ERROR_JOB = JOBS + "$IgniteExceptionJob";
     inline static const std::string ECHO_JOB = JOBS + "$EchoJob";
     inline static const std::string RETURN_NULL_JOB = JOBS + "$ReturnNullJob";
+    inline static const std::string GET_PART_DISTRIBUTION_JOB = JOBS + 
"$GetPartitionDistributionByTableCppJob";
 
     static constexpr const char *KEY_COLUMN = "KEY";
     static constexpr const char *VAL_COLUMN = "VAL";
+    static constexpr const char *PART_PSEUDOCOLUMN = "__partition_id";
 
     /**
      * Get logger.
@@ -68,6 +70,26 @@ public:
      */
     static std::shared_ptr<gtest_logger> get_logger() { return 
std::make_shared<gtest_logger>(false, true); }
 
+    /**
+     * Get tuple for specified column names and values.
+     *
+     * @param col_names Array with column names, should be same size as @c 
col_values.
+     * @param col_values Array with column values, should be same size as @c 
col_names.
+     * @return Ignite tuple with provided values and columns.
+     */
+    static ignite_tuple get_tuple(const std::vector<std::string>& col_names, 
const std::vector<primitive>& col_values) {
+        assert(col_values.size() == col_names.size());
+        assert(!col_values.empty());
+
+        ignite_tuple res;
+
+        for (size_t i = 0; i < col_names.size(); ++i) {
+            res.set(col_names[i], col_values[i]);
+        }
+
+        return res;
+    }
+
     /**
      * Get tuple for specified column values.
      *
diff --git 
a/modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp 
b/modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp
new file mode 100644
index 00000000000..e789b2d0da2
--- /dev/null
+++ b/modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp
@@ -0,0 +1,737 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "detail/string_extensions.h"
+#include "ignite_runner_suite.h"
+#include "tests/test-common/test_utils.h"
+
+#include "ignite/client/detail/utils.h"
+#include "ignite/client/ignite_client.h"
+#include "ignite/client/ignite_client_configuration.h"
+
+#include "proxy/asio_proxy.h"
+#include "proxy/message_listener.h"
+
+#include <gtest/gtest.h>
+
+namespace ignite {
+
+using namespace std::chrono_literals;
+
+using part_id_type = std::size_t;
+using id_type = std::int64_t;
+
+constexpr id_type MIN_ID = 0;
+constexpr id_type MAX_ID = 1000;
+
+static const std::map<std::string, std::string> NODES_TO_ADDR = {
+    {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner", 
"127.0.0.1:10942"},
+    {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner_2", 
"127.0.0.1:10943"}
+};
+
+/**
+ * Parses string-encoded partition distribution.
+ * Example of string:
+ * 
ff979603-fb56-49e9-bc79-7c4487bbbafd=[4];2e14bb82-5cb5-4283-aafd-f2d8552a842a=[5,
 8, 9];
+ * @param encoded String-encoded distribution
+ * @return Conveniently structured distribution.
+ */
+std::map<int64_t, uuid> parse_partition_distribution(const std::string& 
encoded) {
+    std::map<int64_t, uuid> res;
+
+    size_t lhs = 0;
+    size_t rhs = std::string::npos;
+    while ((rhs = encoded.find(';', lhs)) != std::string::npos) {
+        std::string chunk = encoded.substr(lhs, rhs - lhs);
+
+        auto eq_pos = chunk.find('=');
+
+        if (eq_pos == std::string::npos) {
+            throw std::runtime_error("Incorrect partition distribution text:" 
+ encoded);
+        }
+
+        auto node_id = uuid::from_string(chunk.substr(0, eq_pos));
+
+        if (!node_id.has_value()) {
+            throw std::runtime_error("can't parse partition distribution, 
incorrect input:" + encoded);
+        }
+
+        std::string num_list = chunk.substr(eq_pos + 1); // e.g. [1,2,3]
+
+        size_t lo = 1;
+        size_t hi = std::string::npos;
+        while ((hi = num_list.find_first_of(",]", lo)) != std::string::npos) {
+            int64_t part_id = std::stoll(num_list.substr(lo, hi - lo));
+            res[part_id] = *node_id;
+            lo = hi + 1;
+        }
+        lhs = rhs + 1;
+    }
+
+    return res;
+}
+
+template<typename T>
+class partition_awareness_test : public ignite_runner_suite {
+private:
+    T tab_info{};
+protected:
+    void SetUp() override {
+        ignite_client_configuration client_cfg;
+        client_cfg.set_endpoints(get_node_addrs());
+
+        m_direct_client = ignite_client::start(client_cfg, 5s);
+
+        auto nodes = m_direct_client.get_cluster_nodes();
+
+        for (auto& node : nodes) {
+            if (NODES_TO_ADDR.count(node.get_name())) {
+                m_endpoint_to_node_id[NODES_TO_ADDR.at(node.get_name())] = 
node.get_id();
+            }
+        }
+
+        ASSERT_EQ(m_endpoint_to_node_id.size(), get_node_addrs().size())
+            << "Incorrect node names or address!"
+            << "This test should be able to connect to all non-ssl nodes";
+
+        drop_table_if_exists();
+
+        create_table();
+
+        collect_partition_distribution();
+
+        setup_proxy();
+    }
+
+    void TearDown() override {
+        drop_table_if_exists();
+    }
+
+    ignite_tuple key_tup(id_type key) {
+        return get_tuple(tab_info.key_column_names, tab_info.get_pk_vals(key));
+    }
+
+    ignite_tuple main_val_tup(id_type key) {
+        return get_tuple(tab_info.non_key_column_names, 
tab_info.get_main_vals(key));
+    }
+
+    ignite_tuple alt_val_tup(id_type key) {
+        return get_tuple(tab_info.non_key_column_names, 
tab_info.get_alt_vals(key));
+    }
+
+    ignite_tuple main_rec(id_type key) {
+        return get_tuple(tab_info.all_column_names, 
tab_info.get_main_rec(key));
+    }
+
+    ignite_tuple alt_rec(id_type key) {
+        return get_tuple(tab_info.all_column_names, tab_info.get_alt_rec(key));
+    }
+
+    void create_table() {
+        m_direct_client.get_sql().execute(nullptr, nullptr, 
{tab_info.create_table_ddl()}, {});
+    }
+
+    void drop_table_if_exists() {
+        m_direct_client.get_sql().execute(nullptr, nullptr, {"drop table if 
exists " + tab_info.table_name}, {});
+    }
+
+    void collect_partition_distribution() {
+        std::map<id_type, int64_t> key_to_part;
+
+        populate_table();
+
+        std::stringstream sql;
+        sql << "select ID " << ", " << PART_PSEUDOCOLUMN << " from " << 
tab_info.table_name << " order by ID";
+
+        auto rs = m_direct_client.get_sql().execute(nullptr, nullptr, 
{sql.str()}, {});
+        do {
+            auto page = rs.current_page();
+
+            for (auto &rec : page) {
+                auto key = rec.get("ID").get<id_type>();
+                auto part_id = rec.get(PART_PSEUDOCOLUMN).get<int64_t>();
+
+                key_to_part[key] = part_id;
+            }
+
+            if (rs.has_more_pages())
+                rs.fetch_next_page();
+            else
+                break;
+        } while (true);
+
+        auto job_exec = 
m_direct_client.get_compute().submit(job_target::any_node(m_direct_client.get_cluster_nodes()),
+            job_descriptor::builder{GET_PART_DISTRIBUTION_JOB}.build(), 
{tab_info.table_name});
+
+        auto res = job_exec.get_result()->get_primitive().template 
get<std::string>();
+
+        auto part_to_node = parse_partition_distribution(res);
+
+        for (auto [key, part_id] : key_to_part) {
+            m_key_distribution[key] = part_to_node[part_id];
+        }
+
+        clear_table();
+    }
+
+    void populate_table() {
+        auto view = 
m_direct_client.get_tables().get_table(tab_info.table_name)->get_key_value_binary_view();
+
+        for (int64_t key = MIN_ID; key < MAX_ID; ++key) {
+            view.put(nullptr, this->key_tup(key), this->main_val_tup(key));
+        }
+    }
+
+    void clear_table() {
+        std::stringstream sql;
+        sql << "delete from " << tab_info.table_name;
+
+        m_direct_client.get_sql().execute(nullptr, nullptr, {sql.str()}, {});
+    }
+
+    void setup_proxy() {
+        auto srv_endpoints = get_node_addrs();
+
+        std::vector<proxy::configuration> proxy_cfg;
+        proxy_cfg.reserve(srv_endpoints.size());
+
+        std::vector<std::string> proxy_endpoints;
+        proxy_endpoints.reserve(srv_endpoints.size());
+
+        std::uint16_t proxy_port = 50000;
+        for (auto& srv_endpoint: srv_endpoints) {
+            auto node_id = m_endpoint_to_node_id.at(srv_endpoint);
+
+            auto in_listener = std::make_shared<proxy::message_listener>();
+            auto out_listener = std::make_shared<proxy::message_listener>();
+            m_in_listeners[node_id] = in_listener;
+            m_out_listeners[node_id] = out_listener;
+
+            proxy_cfg.emplace_back(proxy_port, srv_endpoint, in_listener, 
out_listener);
+            proxy_endpoints.push_back("127.0.0.1:" + 
std::to_string(proxy_port));
+
+            proxy_port++;
+        }
+
+        proxy = std::make_unique<proxy::asio_proxy>(proxy_cfg, get_logger());
+
+        ignite_client_configuration client_cfg;
+        client_cfg.set_endpoints(proxy_endpoints);
+        client_cfg.set_logger(get_logger());
+
+        m_client = ignite_client::start(client_cfg, 5s);
+
+        m_kv_view = 
m_client.get_tables().get_table(tab_info.table_name)->get_key_value_binary_view();
+        m_rec_view = 
m_client.get_tables().get_table(tab_info.table_name)->get_record_binary_view();
+
+        {
+            // we initiate initial partition assignment load to reduce tests 
flakiness on first records
+            auto res = m_kv_view.get(nullptr, this->key_tup(42));
+
+            auto start = std::chrono::steady_clock::now();
+
+            int total = 0;
+            while (total == 0) {
+                for (auto &[_, node_id] : m_endpoint_to_node_id) {
+                    total += count_op_by_node_id(node_id, 
protocol::client_operation::PARTITION_ASSIGNMENT_GET);
+                }
+
+                if (std::chrono::steady_clock::now() - start < 3s) {
+                    std::this_thread::yield();
+                } else {
+                    FAIL() << "Test didn't wait for partition assignment";
+                }
+            }
+        }
+    }
+
+    void toggle_message_registration(bool enable) {
+        for (auto &[_, listener] : m_in_listeners) {
+            listener->toggle_message_registration(enable);
+        }
+
+        for (auto &[_, listener] : m_out_listeners) {
+            listener->toggle_message_registration(enable);
+        }
+    }
+
+    /**
+     * Count how many operations was sent directly to desired node.
+     * @param node_id Node id.
+     * @return Number of operations.
+     */
+    int count_op_by_node_id(uuid node_id, protocol::client_operation op) {
+        auto in_listener = m_in_listeners.at(node_id);
+
+        int found = 0;
+        while (true) {
+            auto msgs = in_listener->get_next<proxy::client_message>();
+
+            if (msgs.empty()) {
+                break;
+            }
+
+            for (auto &msg : msgs) {
+                get_logger()->log_debug("Processed: node_id=" + 
detail::to_string(node_id)
+                    + " req_id=" + std::to_string(msg.get_req_id()) + " op=" + 
std::to_string(int(msg.get_op())));
+
+                if (msg.get_op() == op) {
+                    found++;
+                }
+            }
+        }
+
+        return found;
+    }
+
+    void check_messages_sent_to_correct_nodes(id_type key, 
ignite::protocol::client_operation op) {
+        auto this_node_id = m_key_distribution.at(key);
+
+        // check if client connected to this node
+        bool connected_to_this_node = m_in_listeners.count(this_node_id);
+
+        int total = 0;
+        for (auto &[ep, node_id] : m_endpoint_to_node_id) {
+            auto cnt = count_op_by_node_id(node_id, op);
+
+            total += cnt;
+            if (connected_to_this_node) {
+                if (this_node_id == node_id) {
+                    EXPECT_EQ(1, cnt) << "Key " << key << " was not found 
among messages to correct node";
+                } else if (this_node_id != node_id) {
+                    EXPECT_EQ(0, cnt) << "Key " << key << " was found among 
messages to incorrect node";
+                }
+            }
+        }
+
+        if (!connected_to_this_node) {
+            EXPECT_EQ(1, total) << "Key " << key
+                                << " was not found among messages to connected 
nodes or message was duplicated";
+        }
+    }
+
+    /**
+     * Client which connected directly.
+     */
+    ignite_client m_direct_client;
+
+    std::map<std::string, uuid> m_endpoint_to_node_id;
+
+    std::map<id_type, uuid> m_key_distribution;
+
+    /**
+     * Client which connect through the proxy.
+     */
+    ignite_client m_client;
+
+    key_value_view<ignite_tuple, ignite_tuple> m_kv_view;
+    record_view<ignite_tuple> m_rec_view;
+
+    std::unique_ptr<proxy::asio_proxy> proxy;
+
+    std::map<uuid, std::shared_ptr<proxy::message_listener>> m_in_listeners;
+    std::map<uuid, std::shared_ptr<proxy::message_listener>> m_out_listeners;
+};
+
+struct SimpleType {
+    std::string table_name = "simple_table";
+    std::vector<std::string> key_column_names = {"ID"};
+    std::vector<std::string> collocation_column_names = {"ID"};
+    std::vector<std::string> non_key_column_names = {"TEXT"};
+    std::vector<std::string> all_column_names{};
+
+    SimpleType() {
+        all_column_names.reserve(key_column_names.size() + 
non_key_column_names.size());
+        all_column_names.insert(all_column_names.end(), 
key_column_names.begin(), key_column_names.end());
+        all_column_names.insert(all_column_names.end(), 
non_key_column_names.begin(), non_key_column_names.end());
+    }
+
+    static std::string create_table_ddl() {
+        return "create table simple_table (id bigint primary key, text 
varchar)";
+    }
+
+    static std::vector<primitive> get_pk_vals(id_type key) {
+        return {key};
+    }
+
+    /**
+     * Values of columns which is part of collocation key.
+     */
+    static std::vector<primitive> get_ck_values(id_type key) {
+        return {key};
+    }
+
+    static std::vector<primitive> get_main_vals(id_type key) {
+        return {std::to_string(key)};
+    }
+
+    static std::vector<primitive> get_alt_vals(id_type key) {
+        return {std::to_string(key + MAX_ID)};
+    }
+
+    static std::vector<primitive> get_main_rec(id_type key) {
+        std::vector<primitive> res;
+
+        auto key_part = get_pk_vals(key);
+        auto non_key_part = get_main_vals(key);
+
+        res.reserve(key_part.size() + non_key_part.size());
+
+        res.insert(res.end(), std::make_move_iterator(key_part.begin()), 
std::make_move_iterator(key_part.end()));
+        res.insert(res.end(), std::make_move_iterator(non_key_part.begin()), 
std::make_move_iterator(non_key_part.end()));
+
+        return res;
+    }
+
+    static std::vector<primitive> get_alt_rec(id_type key) {
+        std::vector<primitive> res;
+
+        auto key_part = get_pk_vals(key);
+        auto non_key_part = get_alt_vals(key);
+
+        res.reserve(key_part.size() + non_key_part.size());
+
+        res.insert(res.end(), std::make_move_iterator(key_part.begin()), 
std::make_move_iterator(key_part.end()));
+        res.insert(res.end(), std::make_move_iterator(non_key_part.begin()), 
std::make_move_iterator(non_key_part.end()));
+
+        return res;
+    }
+};
+
+struct TypeWithCollocationKey {
+    std::string table_name = "table_with_collocation_key";
+    std::vector<std::string> key_column_names = {"ID", "DEPT_ID"};
+    std::vector<std::string> collocation_column_names = {"DEPT_ID"};
+    std::vector<std::string> non_key_column_names = {"TEXT"};
+    std::vector<std::string> all_column_names{};
+
+    TypeWithCollocationKey() {
+        all_column_names.reserve(key_column_names.size() + 
non_key_column_names.size());
+        all_column_names.insert(all_column_names.end(), 
key_column_names.begin(), key_column_names.end());
+        all_column_names.insert(all_column_names.end(), 
non_key_column_names.begin(), non_key_column_names.end());
+    }
+
+    static std::string create_table_ddl() {
+        return "create table table_with_collocation_key "
+               "(id bigint, dept_id bigint, text varchar, primary key (id, 
dept_id)) "
+               "colocate by (dept_id)";
+    }
+
+    static std::vector<primitive> get_pk_vals(id_type key) {
+        return {key, -key};
+    }
+
+    /**
+     * Values of columns which is part of collocation key.
+     */
+    static std::vector<primitive> get_ck_values(id_type key) {
+        return {-key};
+    }
+
+    static std::vector<primitive> get_main_vals(id_type key) {
+        return {std::to_string(key)};
+    }
+
+    static std::vector<primitive> get_alt_vals(id_type key) {
+        return {std::to_string(key + MAX_ID)};
+    }
+
+    static std::vector<primitive> get_main_rec(id_type key) {
+        std::vector<primitive> res;
+
+        auto key_part = get_pk_vals(key);
+        auto non_key_part = get_main_vals(key);
+
+        res.reserve(key_part.size() + non_key_part.size());
+
+        res.insert(res.end(), std::make_move_iterator(key_part.begin()), 
std::make_move_iterator(key_part.end()));
+        res.insert(res.end(), std::make_move_iterator(non_key_part.begin()), 
std::make_move_iterator(non_key_part.end()));
+
+        return res;
+    }
+
+    static std::vector<primitive> get_alt_rec(id_type key) {
+        std::vector<primitive> res;
+
+        auto key_part = get_pk_vals(key);
+        auto non_key_part = get_alt_vals(key);
+
+        res.reserve(key_part.size() + non_key_part.size());
+
+        res.insert(res.end(), std::make_move_iterator(key_part.begin()), 
std::make_move_iterator(key_part.end()));
+        res.insert(res.end(), std::make_move_iterator(non_key_part.begin()), 
std::make_move_iterator(non_key_part.end()));
+
+        return res;
+    }
+};
+
+using TestTypes = ::testing::Types<SimpleType, TypeWithCollocationKey>;
+TYPED_TEST_SUITE(partition_awareness_test, TestTypes);
+
+TYPED_TEST(partition_awareness_test, kv_contains) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val = this->m_kv_view.contains(nullptr, this->key_tup(key));
+
+        EXPECT_TRUE(val);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_CONTAINS_KEY);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val = this->m_kv_view.get(nullptr, this->key_tup(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get_and_put) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val = this->m_kv_view.get_and_put(nullptr, this->key_tup(key), 
this->alt_val_tup(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET_AND_UPSERT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get_and_remove) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val = this->m_kv_view.get_and_remove(nullptr, this->key_tup(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET_AND_DELETE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get_and_replace) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val = this->m_kv_view.get_and_replace(nullptr, 
this->key_tup(key), this->alt_val_tup(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET_AND_REPLACE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_replace) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        bool success = this->m_kv_view.replace(nullptr, this->key_tup(key), 
this->alt_val_tup(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_REPLACE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_replace_exact) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        bool success = this->m_kv_view.replace(nullptr, this->key_tup(key), 
this->main_val_tup(key), this->alt_val_tup(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_REPLACE_EXACT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_remove) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        bool success = this->m_kv_view.remove(nullptr, this->key_tup(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_DELETE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_remove_exact) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        bool success = this->m_kv_view.remove(nullptr, this->key_tup(key), 
this->main_val_tup(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_DELETE_EXACT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, kv_put) {
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        this->m_kv_view.put(nullptr, this->key_tup(key), 
this->main_val_tup(key));
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_UPSERT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val =  this->m_rec_view.get(nullptr, this->key_tup(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get_and_upsert) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val =  this->m_rec_view.get_and_upsert(nullptr, 
this->alt_rec(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET_AND_UPSERT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get_and_remove) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val =  this->m_rec_view.get_and_remove(nullptr, 
this->alt_rec(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET_AND_DELETE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get_and_replace) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto val =  this->m_rec_view.get_and_replace(nullptr, 
this->alt_rec(key));
+
+        EXPECT_TRUE(val.has_value());
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_GET_AND_REPLACE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_replace) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto success =  this->m_rec_view.replace(nullptr, this->alt_rec(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_REPLACE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_replace_exact) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto success =  this->m_rec_view.replace(nullptr, this->main_rec(key), 
this->alt_rec(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_REPLACE_EXACT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_remove) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto success =  this->m_rec_view.remove(nullptr, this->key_tup(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_DELETE);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_remove_exact) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto success =  this->m_rec_view.remove_exact(nullptr, 
this->main_rec(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_DELETE_EXACT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_upsert) {
+    this->populate_table();
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+         this->m_rec_view.upsert(nullptr, this->main_rec(key));
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_UPSERT);
+    }
+}
+
+TYPED_TEST(partition_awareness_test, rec_insert) {
+
+    for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+        auto success =  this->m_rec_view.insert(nullptr, this->main_rec(key));
+
+        EXPECT_TRUE(success);
+
+        this->check_messages_sent_to_correct_nodes(key, 
protocol::client_operation::TUPLE_INSERT);
+    }
+}
+
+} // namespace ignite
diff --git a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt 
b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
index 402291e4304..ea4abb5be51 100644
--- a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
@@ -24,4 +24,4 @@ set(SOURCES
     connection_test.cpp
 )
 
-ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common 
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
+ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common 
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
diff --git a/modules/platforms/cpp/tests/fake_server/connection_test.cpp 
b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
index 393d6f92338..8d37c2b20af 100644
--- a/modules/platforms/cpp/tests/fake_server/connection_test.cpp
+++ b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
@@ -17,7 +17,7 @@
 
 #include "fake_server.h"
 #include "ignite/client/ignite_client.h"
-#include "proxy/asio_proxy.h"
+#include <proxy/asio_proxy.h>
 #include "tests/client-test/ignite_runner_suite.h"
 
 #include <gtest/gtest.h>
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h 
b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
deleted file mode 100644
index 92773b60e39..00000000000
--- a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
+++ /dev/null
@@ -1,38 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-#pragma once
-#include <queue>
-#include <utility>
-
-namespace ignite::proxy {
-
-using message = std::vector<char>;
-
-class message_listener {
-public:
-    void register_message(message msg) {
-        m_queue.push(std::move(msg));
-    }
-
-    const std::queue<message>& get_msg_queue() const {
-        return m_queue;
-    }
-
-private:
-    std::queue<message> m_queue{};
-};
-} // namespace ignite::proxy
diff --git a/modules/platforms/cpp/tests/test-common/CMakeLists.txt 
b/modules/platforms/cpp/tests/test-common/CMakeLists.txt
index 4ec7a914f6e..b13587dca33 100644
--- a/modules/platforms/cpp/tests/test-common/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/test-common/CMakeLists.txt
@@ -27,7 +27,7 @@ set(SOURCES
 )
 
 add_library(${TARGET} OBJECT ${SOURCES})
-target_link_libraries(${TARGET} ignite-common)
+target_link_libraries(${TARGET} ignite-common asio msgpack-c)
 target_include_directories(${TARGET} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
 
 set_target_properties(${TARGET} PROPERTIES VERSION ${CMAKE_PROJECT_VERSION})
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h 
b/modules/platforms/cpp/tests/test-common/proxy/asio_proxy.h
similarity index 93%
rename from modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
rename to modules/platforms/cpp/tests/test-common/proxy/asio_proxy.h
index 976f2faa8a8..e5b0dc61758 100644
--- a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
+++ b/modules/platforms/cpp/tests/test-common/proxy/asio_proxy.h
@@ -93,11 +93,11 @@ public:
         , m_dst(std::move(dst))
         , m_listener(std::move(listener))
         , m_failed(failed)
-        , m_logger(std::move(logger)) {}
+        , m_logger(std::move(logger)) { }
 
     void do_read() {
         m_src->async_read_some(asio::buffer(m_buf, BUFF_SIZE),
-        [self = this->shared_from_this()](const asio::error_code& ec, size_t 
len) {
+        [self = shared_from_this()](const asio::error_code& ec, size_t len) {
             if (ec) {
                 if (ec == asio::error::eof) {
                     return;
@@ -107,7 +107,7 @@ public:
                 self->m_failed.store(true);
             }
 
-            message m{self->m_buf.begin(), self->m_buf.begin() + len};
+            raw_message m{self->m_buf.begin(), self->m_buf.begin() + len};
 
             if (self->m_listener) {
                 self->m_listener->register_message(m);
@@ -117,7 +117,7 @@ public:
         });
     }
 
-    void do_write(message&& msg) {
+    void do_write(raw_message&& msg) {
         asio::async_write(
             *m_dst, asio::buffer(msg.data(), msg.size()),
             [self = shared_from_this()](asio::error_code ec, size_t) {
@@ -154,6 +154,7 @@ public:
         std::shared_ptr<gtest_logger> logger)
         : m_in_sock(std::move(in_sock))
         , m_out_sock(std::move(out_sock))
+        , m_logger(logger)
     {
         m_forward_part = std::make_shared<session_part>(m_in_sock, m_out_sock, 
in_listener, failed, logger);
         m_reverse_part = std::make_shared<session_part>(m_out_sock, m_in_sock, 
out_listener, failed, logger);
@@ -169,6 +170,8 @@ public:
                     );
                 }
 
+                self->m_logger->log_info("Proxy: connected to " + 
e.address().to_string() + ":" + std::to_string(e.port()));
+
                 self->do_serve();
             });
     }
@@ -183,6 +186,8 @@ private:
 
     std::shared_ptr<session_part> m_forward_part;
     std::shared_ptr<session_part> m_reverse_part;
+
+    std::shared_ptr<gtest_logger> m_logger;
 };
 
 class asio_proxy {
@@ -233,6 +238,8 @@ private:
                 throw std::runtime_error("Error accepting incoming connection 
" + ec.message());
             }
 
+            m_logger->log_info("Proxy: accepted connection directed to " + 
entry.m_out_host + ":" + entry.m_out_port);
+
             auto p_in_sock = std::make_shared<tcp::socket>(std::move(in_sock));
             auto p_out_sock = std::make_shared<tcp::socket>(m_io_context);
             auto ses = std::make_shared<session>(
diff --git a/modules/platforms/cpp/tests/test-common/proxy/message_listener.h 
b/modules/platforms/cpp/tests/test-common/proxy/message_listener.h
new file mode 100644
index 00000000000..f7007951a1d
--- /dev/null
+++ b/modules/platforms/cpp/tests/test-common/proxy/message_listener.h
@@ -0,0 +1,179 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <atomic>
+#include <cassert>
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+    structured_message() = default;
+
+    structured_message(const structured_message& structured_message) = default;
+
+    explicit structured_message(std::vector<std::byte> data)
+        :m_data(std::move(data)) { }
+
+    [[nodiscard]] protocol::reader payload_reader() const {
+        return protocol::reader{{m_data.data() + m_payload_pos, m_data.size() 
- m_payload_pos}};
+    }
+protected:
+    std::vector<std::byte> m_data{};
+    size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+    client_message() = default;
+
+    client_message(const client_message& msg) = default;
+
+    explicit client_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_op = static_cast<protocol::client_operation>(rd.read_int32());
+        m_req_id = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos <= m_data.size());
+    }
+
+    [[nodiscard]] protocol::client_operation get_op() const { return m_op; }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+private:
+    protocol::client_operation m_op{};
+    int64_t m_req_id{};
+};
+
+/**
+ * Message which server sends to client.
+ */
+class server_message : public structured_message {
+public:
+    server_message() = default;
+
+    server_message(const server_message&) = default;
+
+    explicit server_message(std::vector<std::byte> data)
+        : structured_message(std::move(data))
+    {
+        protocol::reader rd{{m_data.data(), m_data.size()}};
+
+        m_req_id = rd.read_int64();
+        m_flags = rd.read_int32();
+        m_obs_ts = rd.read_int64();
+
+        m_payload_pos = rd.position();
+
+        assert(m_payload_pos <= m_data.size());
+    }
+
+    [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+    [[nodiscard]] int32_t get_flags() const { return m_flags; }
+
+    [[nodiscard]] int64_t get_obs_ts() const { return m_obs_ts; }
+
+private:
+    int64_t m_req_id{};
+    int32_t m_flags{};
+    int64_t m_obs_ts{};
+};
+
+/**
+ * Intercepts messages which go through @c asio_proxy.
+ */
+class message_listener {
+public:
+    void register_message(raw_message msg) {
+        if (enable_message_registration.load()) {
+            std::unique_lock lock(m_mutex);
+            m_queue.push(std::move(msg));
+        }
+    }
+
+    [[nodiscard]] std::queue<raw_message> get_msg_queue() const {
+        std::shared_lock lock(m_mutex);
+        return m_queue;
+    }
+
+    template<typename MESSAGE_TYPE>
+    std::vector<MESSAGE_TYPE> get_next() {
+        std::unique_lock lock(m_mutex);
+
+        std::vector<MESSAGE_TYPE> res;
+
+        while (!m_queue.empty() && res.empty()) {
+            auto& chunk = m_queue.front();
+
+            network::data_buffer_ref buf{{chunk.data(), chunk.size()}};
+
+            while (true) {
+                auto out  = codec.decode(buf);
+
+                if (out.empty()) {
+                    break;
+                }
+
+                auto out_bv = out.get_bytes_view();
+                std::vector<std::byte> data{out_bv.begin(), out_bv.end()};
+                res.emplace_back(std::move(data));
+            }
+            m_queue.pop();
+        }
+
+        return res;
+    }
+
+    /**
+     * Enable/disable message registration for message listeners;
+     * @param enable @c True if registration enabled, otherwise disabled.
+     */
+    void toggle_message_registration(bool enable) {
+        enable_message_registration.store(enable);
+    }
+
+private:
+    std::queue<raw_message> m_queue{};
+
+    mutable std::shared_mutex m_mutex;
+
+    network::length_prefix_codec codec;
+
+    std::atomic_bool enable_message_registration{true};
+};
+} // namespace ignite::proxy
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
index c7a33174435..886d3e125b3 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
@@ -30,8 +30,10 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -47,6 +49,8 @@ import org.apache.ignite.marshalling.ByteArrayMarshaller;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
 import org.jetbrains.annotations.Nullable;
 
 /** Jobs and marshallers definitions that are used in tests. */
@@ -630,4 +634,29 @@ public class Jobs {
             throw new CustomException(TRACE_ID, COLUMN_NOT_FOUND_ERR, "Custom 
job error", null);
         }
     }
+
+    /**
+     * Job returns actual partition distribution for particular table 
calculated by java side.
+     */
+    public static class GetPartitionDistributionByTableCppJob implements 
ComputeJob<String, String> {
+        @Override
+        public @Nullable CompletableFuture<String> 
executeAsync(JobExecutionContext context, String tableName) {
+            PartitionDistribution pd = 
context.ignite().tables().table(tableName).partitionDistribution();
+
+            Map<UUID, List<Long>> distribution = new HashMap<>();
+            for (Partition partition : pd.partitions()) {
+                var primaryNode = pd.primaryReplica(partition);
+
+                distribution.computeIfAbsent(primaryNode.id(), k -> new 
ArrayList<>()).add(partition.id());
+            }
+
+            StringBuilder sb = new StringBuilder();
+
+            for (Entry<UUID, List<Long>> entry : distribution.entrySet()) {
+                
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(";");
+            }
+
+            return completedFuture(sb.toString());
+        }
+    }
 }


Reply via email to