This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bd26df60e3e IGNITE-22141 C++ Client: Implement Partition Awareness
(#7966)
bd26df60e3e is described below
commit bd26df60e3e5278d7baba573aad8e5dcf44c0488
Author: Ed Rakhmankulov <[email protected]>
AuthorDate: Tue Apr 14 19:12:23 2026 +0300
IGNITE-22141 C++ Client: Implement Partition Awareness (#7966)
---
.../platform_tests/PlatformCppOdbcTestsDebLinux.kt | 1 -
.../platform_tests/PlatformCppOdbcTestsRpmLinux.kt | 1 -
.../platform_tests/PlatformCppOdbcTestsTgzLinux.kt | 1 -
.../test/platform_tests/PlatformCppTestsLinux.kt | 1 -
.../test/platform_tests/PlatformCppTestsWindows.kt | 1 -
.../ignite/client/detail/cluster_connection.cpp | 50 +-
.../cpp/ignite/client/detail/cluster_connection.h | 68 +-
.../client/detail/connection_event_handler.h | 7 +
.../cpp/ignite/client/detail/node_connection.cpp | 7 +-
.../cpp/ignite/client/detail/node_connection.h | 18 +
.../cpp/ignite/client/detail/table/schema.h | 51 +-
.../cpp/ignite/client/detail/table/table_impl.cpp | 224 ++++++-
.../cpp/ignite/client/detail/table/table_impl.h | 35 +
modules/platforms/cpp/ignite/common/uuid.cpp | 52 ++
modules/platforms/cpp/ignite/common/uuid.h | 13 +
modules/platforms/cpp/ignite/common/uuid_test.cpp | 78 +++
.../cpp/ignite/network/length_prefix_codec.h | 3 +-
.../cpp/ignite/protocol/client_operation.h | 3 +
modules/platforms/cpp/ignite/protocol/messages.cpp | 39 +-
modules/platforms/cpp/ignite/protocol/messages.h | 31 +-
.../cpp/ignite/protocol/partition_assignment.h | 74 +++
.../platforms/cpp/tests/client-test/CMakeLists.txt | 1 +
.../cpp/tests/client-test/ignite_runner_suite.h | 22 +
.../tests/client-test/partition_awareness_test.cpp | 737 +++++++++++++++++++++
.../platforms/cpp/tests/fake_server/CMakeLists.txt | 2 +-
.../cpp/tests/fake_server/connection_test.cpp | 2 +-
.../cpp/tests/fake_server/proxy/message_listener.h | 38 --
.../platforms/cpp/tests/test-common/CMakeLists.txt | 2 +-
.../proxy/asio_proxy.h | 15 +-
.../cpp/tests/test-common/proxy/message_listener.h | 179 +++++
.../apache/ignite/internal/runner/app/Jobs.java | 29 +
31 files changed, 1677 insertions(+), 108 deletions(-)
diff --git a/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
b/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
index e9435c29f32..0a0e1797158 100644
--- a/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppOdbcTestsDebLinux.kt
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsDebLinux : BuildType({
""".trimIndent()
params {
- param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
param("PATH__CMAKE_BUILD_DIRECTORY",
"%PATH__WORKING_DIR%/cmake-build-debug")
param("PATH__ODBC_TEST_RESULTS",
"%PATH__WORKING_DIR%/odbc_tests_results.xml")
text("PATH__WORKING_DIR",
"%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp",
display = ParameterDisplay.HIDDEN, allowEmpty = true)
diff --git a/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
b/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
index 40d5a89ea91..6e475d3e9a0 100644
--- a/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppOdbcTestsRpmLinux.kt
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsRpmLinux : BuildType({
""".trimIndent()
params {
- param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
param("PATH__CMAKE_BUILD_DIRECTORY",
"%PATH__WORKING_DIR%/cmake-build-debug")
param("PATH__ODBC_TEST_RESULTS",
"%PATH__WORKING_DIR%/odbc_tests_results.xml")
text("PATH__WORKING_DIR",
"%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp",
display = ParameterDisplay.HIDDEN, allowEmpty = true)
diff --git a/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
b/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
index dae6fa5ef03..5bc42a296ae 100644
--- a/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppOdbcTestsTgzLinux.kt
@@ -20,7 +20,6 @@ object PlatformCppOdbcTestsTgzLinux : BuildType({
""".trimIndent()
params {
- param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
param("PATH__CMAKE_BUILD_DIRECTORY",
"%PATH__WORKING_DIR%/cmake-build-debug")
param("PATH__ODBC_TEST_RESULTS",
"%PATH__WORKING_DIR%/odbc_tests_results.xml")
text("PATH__WORKING_DIR",
"%teamcity.build.checkoutDir%/%VCSROOT__IGNITE3%/modules/platforms/cpp",
display = ParameterDisplay.HIDDEN, allowEmpty = true)
diff --git a/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
b/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
index 1446d9463c8..1861433b81b 100644
--- a/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
+++ b/.teamcity/test/platform_tests/PlatformCppTestsLinux.kt
@@ -24,7 +24,6 @@ object PlatformCppTestsLinux : BuildType({
""".trimIndent()
params {
- param("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
param("PATH__CMAKE_BUILD_DIRECTORY",
"%PATH__WORKING_DIR%/cmake-build-debug")
param("PATH__CLIENT_TEST_RESULTS",
"%PATH__WORKING_DIR%/cpp_client_tests_results.xml")
param("PATH__UNIT_TESTS_RESULT",
"%PATH__WORKING_DIR%/cpp_unit_test_results.xml")
diff --git a/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
b/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
index ad89eeb12fb..0614a9b7984 100644
--- a/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
+++ b/.teamcity/test/platform_tests/PlatformCppTestsWindows.kt
@@ -27,7 +27,6 @@ object PlatformCppTestsWindows : BuildType({
""".trimIndent()
params {
- hiddenText("env.IGNITE_CPP_TESTS_USE_SINGLE_NODE", "")
hiddenText("PATH__CMAKE_BUILD_DIRECTORY",
"""%PATH__WORKING_DIR%\cmake-build-debug""")
hiddenText("PATH__CLIENT_TEST_RESULTS",
"""%PATH__CMAKE_BUILD_DIRECTORY%\cpp_client_tests_results.xml""")
hiddenText("PATH__ODBC_TEST_RESULTS",
"""%PATH__CMAKE_BUILD_DIRECTORY%\odbc_tests_results.xml""")
diff --git a/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
index 075403afdcc..d866cd54e49 100644
--- a/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
@@ -92,7 +92,9 @@ void cluster_connection::stop() {
void cluster_connection::on_connection_success(const end_point &addr, uint64_t
id) {
m_logger->log_info("Established connection with remote host " +
addr.to_string());
- m_logger->log_debug("Connection ID: " + std::to_string(id));
+
+ if (m_logger->is_debug_enabled())
+ m_logger->log_debug("Connection ID: " + std::to_string(id));
auto connection = node_connection::make_new(
id, m_pool, weak_from_this(), m_logger, m_configuration,
m_timer_thread);
@@ -214,6 +216,16 @@ void
cluster_connection::on_observable_timestamp_changed(std::int64_t timestamp)
}
}
+void cluster_connection::on_partition_assignment_changed(std::int64_t
timestamp) {
+ auto expected = m_assignment_timestamp.load();
+ while (expected < timestamp) {
+ auto success = m_assignment_timestamp.compare_exchange_weak(expected,
timestamp);
+ if (success)
+ return;
+ expected = m_assignment_timestamp.load();
+ }
+}
+
void cluster_connection::remove_client(uint64_t id) {
[[maybe_unused]] std::unique_lock<std::recursive_mutex>
lock(m_connections_mutex);
@@ -262,9 +274,28 @@ std::shared_ptr<node_connection>
cluster_connection::get_random_connected_channe
return std::next(m_connections.begin(), idx)->second;
}
+std::shared_ptr<node_connection> cluster_connection::get_channel(
+ const std::optional<std::string>& preferred_node_name) {
+
+ if (preferred_node_name) {
+ std::unique_lock lock(m_connections_mutex);
+ for (auto& [id, conn] : m_connections) {
+ if (conn->get_node_name() == *preferred_node_name) {
+ return conn;
+ }
+ }
+ }
+
+ return get_random_connected_channel();
+}
+
std::pair<std::shared_ptr<node_connection>, std::int64_t>
cluster_connection::perform_request_handler(
- const operation_function_type &op_func, transaction_impl *tx, const
writer_function_type &wr,
- const std::shared_ptr<response_handler> &handler) {
+ const operation_function_type &op_func,
+ transaction_impl *tx,
+ const writer_function_type &wr,
+ const std::shared_ptr<response_handler> &handler,
+ const std::optional<std::string>& preferred_node_name) {
+
if (tx) {
auto channel = tx->get_connection();
if (!channel)
@@ -279,7 +310,7 @@ std::pair<std::shared_ptr<node_connection>, std::int64_t>
cluster_connection::pe
}
while (true) {
- auto channel = get_random_connected_channel();
+ auto channel = get_channel(preferred_node_name);
if (!channel)
throw ignite_error(error::code::CONNECTION, "No nodes connected");
@@ -290,10 +321,15 @@ std::pair<std::shared_ptr<node_connection>, std::int64_t>
cluster_connection::pe
}
}
-void cluster_connection::perform_request_raw(protocol::client_operation op,
transaction_impl *tx,
- const writer_function_type &wr, ignite_callback<bytes_view> callback) {
+void cluster_connection::perform_request_raw(
+ protocol::client_operation op,
+ transaction_impl *tx,
+ const writer_function_type &wr,
+ ignite_callback<bytes_view> callback,
+ const std::optional<std::string>& preferred_node_name) {
auto handler = std::make_shared<response_handler_raw>(std::move(callback));
- perform_request_handler(static_op(op), tx, wr, std::move(handler));
+
+ perform_request_handler(static_op(op), tx, wr, std::move(handler),
preferred_node_name);
}
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
b/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
index d75912d1eae..972b0b8ac76 100644
--- a/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
@@ -106,10 +106,15 @@ public:
* @param tx Transaction.
* @param wr Request writer function.
* @param handler Request handler.
+ * @param preferred_node_name Name of preferred node.
* @return A connection used to perform request and the request ID.
*/
- std::pair<std::shared_ptr<node_connection>, std::int64_t>
perform_request_handler(const operation_function_type &op_func,
- transaction_impl *tx, const writer_function_type &wr, const
std::shared_ptr<response_handler> &handler);
+ std::pair<std::shared_ptr<node_connection>, std::int64_t>
perform_request_handler(
+ const operation_function_type &op_func,
+ transaction_impl *tx,
+ const writer_function_type &wr,
+ const std::shared_ptr<response_handler> &handler,
+ const std::optional<std::string>& preferred_node_name = std::nullopt);
/**
* Perform request raw.
@@ -119,9 +124,14 @@ public:
* @param tx Transaction.
* @param wr Request writer function.
* @param callback Callback to call on a result.
+ * @param preferred_node_name Name of preferred node.
*/
- void perform_request_raw(protocol::client_operation op, transaction_impl
*tx,
- const writer_function_type &wr, ignite_callback<bytes_view> callback);
+ void perform_request_raw(
+ protocol::client_operation op,
+ transaction_impl *tx,
+ const writer_function_type &wr,
+ ignite_callback<bytes_view> callback,
+ const std::optional<std::string>& preferred_node_name = std::nullopt);
/**
* Perform request raw.
@@ -151,13 +161,15 @@ public:
* @param wr Request writer function.
* @param rd Response reader function.
* @param callback Callback to call on a result.
+ * @param preferred_node_name Name of preferred node.
*/
template<typename T>
std::pair<std::shared_ptr<node_connection>, std::int64_t>
perform_request(protocol::client_operation op,
transaction_impl *tx, const writer_function_type &wr,
- reader_function_type<T> rd, ignite_callback<T> callback) {
+ reader_function_type<T> rd, ignite_callback<T> callback,
+ const std::optional<std::string>& preferred_node_name = std::nullopt) {
auto handler =
std::make_shared<response_handler_reader<T>>(std::move(rd),
std::move(callback));
- return perform_request_handler(static_op(op), tx, wr,
std::move(handler));
+ return perform_request_handler(static_op(op), tx, wr,
std::move(handler), preferred_node_name);
}
/**
@@ -189,7 +201,7 @@ public:
void perform_request(protocol::client_operation op, const
writer_function_type &wr,
std::function<T(protocol::reader &, std::shared_ptr<node_connection>)>
rd, ignite_callback<T> callback) {
auto handler =
std::make_shared<response_handler_reader_connection<T>>(std::move(rd),
std::move(callback));
- perform_request_handler(static_op(op), nullptr, wr,
std::move(handler));
+ perform_request_handler(static_op(op), nullptr, wr,
std::move(handler), std::nullopt);
}
/**
@@ -261,9 +273,13 @@ public:
* @return A connection used to perform request and the request ID.
*/
template<typename T>
- std::pair<std::shared_ptr<node_connection>, std::int64_t>
perform_request_wr(protocol::client_operation op,
- transaction_impl *tx, const writer_function_type &wr,
ignite_callback<T> callback) {
- return perform_request<T>(op, tx, wr, [](protocol::reader &) {},
std::move(callback));
+ std::pair<std::shared_ptr<node_connection>, std::int64_t>
perform_request_wr(
+ protocol::client_operation op,
+ transaction_impl *tx,
+ const writer_function_type &wr,
+ ignite_callback<T> callback,
+ const std::optional<std::string>& preferred_node_name = std::nullopt) {
+ return perform_request<T>(op, tx, wr, [](protocol::reader &) {},
std::move(callback), preferred_node_name);
}
/**
@@ -273,6 +289,20 @@ public:
*/
std::int64_t get_observable_timestamp() const { return
m_observable_timestamp.load(); }
+ /**
+ * Get assignment timestamp.
+ *
+ * @return Assignment timestamp.
+ */
+ std::int64_t get_assignment_timestamp() const { return
m_assignment_timestamp.load(); }
+
+ /**
+ * Get logger.
+ *
+ * @return Logger.
+ */
+ [[nodiscard]] std::shared_ptr<ignite_logger> get_logger() const { return
m_logger; }
+
/**
* @param op Operation code to return.
* @return A function that always returns the same operation.
@@ -289,6 +319,14 @@ private:
*/
std::shared_ptr<node_connection> get_random_connected_channel();
+ /**
+ * Get connection according to provided preference otherwise returns
random node connection.
+ *
+ * @param preferred_node_name Name of preferred node.
+ * @return Node connection.
+ */
+ std::shared_ptr<node_connection> get_channel(const
std::optional<std::string> &preferred_node_name);
+
/**
* Constructor.
*
@@ -342,6 +380,13 @@ private:
*/
void on_observable_timestamp_changed(std::int64_t timestamp) override;
+ /**
+ * Handle partition assignment change.
+ *
+ * @param timestamp Assignment timestamp.
+ */
+ void on_partition_assignment_changed(std::int64_t timestamp) override;
+
/**
* Remove client.
*
@@ -404,6 +449,9 @@ private:
/** Observable timestamp. */
std::atomic_int64_t m_observable_timestamp{0};
+ /** Partition assignment timestamp. */
+ std::atomic_int64_t m_assignment_timestamp{0};
+
/** Timer thread. */
std::shared_ptr<thread_timer> m_timer_thread;
};
diff --git
a/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
b/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
index d8e5322ccca..50ed06dce79 100644
--- a/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
+++ b/modules/platforms/cpp/ignite/client/detail/connection_event_handler.h
@@ -44,6 +44,13 @@ public:
* @param timestamp Timestamp.
*/
virtual void on_observable_timestamp_changed(std::int64_t timestamp) = 0;
+
+ /**
+ * Handle partition assignment change.
+ *
+ * @param timestamp Assignment timestamp.
+ */
+ virtual void on_partition_assignment_changed(std::int64_t timestamp) = 0;
};
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index 6881ee16a68..6e2219c0604 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -69,9 +69,12 @@ void node_connection::process_message(bytes_view msg) {
auto req_id = reader.read_int64();
auto flags = reader.read_int32();
+ auto event_handler = m_event_handler.lock();
if (test_flag(flags,
protocol::response_flag::PARTITION_ASSIGNMENT_CHANGED)) {
auto assignment_ts = reader.read_int64();
- UNUSED_VALUE assignment_ts;
+ if (event_handler) {
+ event_handler->on_partition_assignment_changed(assignment_ts);
+ }
}
auto observable_timestamp = reader.read_int64();
@@ -180,6 +183,8 @@ ignite_result<void>
node_connection::process_handshake_rsp(bytes_view msg) {
std::chrono::milliseconds(response.idle_timeout_ms));
m_protocol_context = response.context;
+ m_node_id = response.node_id;
+ m_node_name = response.node_name;
m_handshake_complete = true;
if (m_heartbeat_interval.count()) {
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.h
b/modules/platforms/cpp/ignite/client/detail/node_connection.h
index 5e130e37829..f6146cb88cf 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.h
@@ -232,8 +232,20 @@ public:
*/
std::shared_ptr<ignite_logger> get_logger() const { return m_logger; }
+ /**
+ * Cancels waiting for over-due responses.
+ */
void handle_timeouts();
+ /**
+ * Name of the node this connection is tethered to.
+ *
+ * @return Name of the node.
+ */
+ const std::string& get_node_name() const {
+ return m_node_name;
+ }
+
private:
/**
* Constructor.
@@ -334,6 +346,12 @@ private:
/** Timer thread. */
std::weak_ptr<thread_timer> m_timer_thread;
+
+ /** Node id. */
+ uuid m_node_id{};
+
+ /** Name of the node this connection is tethered to. */
+ std::string m_node_name{};
};
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/table/schema.h
b/modules/platforms/cpp/ignite/client/detail/table/schema.h
index 71be10ef565..0d8a0c8ad80 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/schema.h
+++ b/modules/platforms/cpp/ignite/client/detail/table/schema.h
@@ -79,6 +79,7 @@ struct schema {
const std::vector<column> columns;
const std::vector<const column *> key_columns;
const std::vector<const column *> val_columns;
+ const std::vector<const column *> collocated_columns;
// Default
schema() = default;
@@ -86,17 +87,23 @@ struct schema {
/**
* Constructor.
*
- * @param version Version.
- * @param columns Columns.
- * @param key_columns Key Columns.
- * @param val_columns Value Columns.
+ * @param version Schema version.
+ * @param columns All columns.
+ * @param key_columns Key columns.
+ * @param val_columns Value columns.
+ * @param collocated_columns Collocated columns.
*/
- schema(std::int32_t version, std::vector<column> &&columns,
std::vector<const column *> &&key_columns,
- std::vector<const column *> &&val_columns)
+ schema(
+ std::int32_t version,
+ std::vector<column> &&columns,
+ std::vector<const column *> &&key_columns,
+ std::vector<const column *> &&val_columns,
+ std::vector<const column *> &&collocated_columns)
: version(version)
, columns(std::move(columns))
, key_columns(std::move(key_columns))
- , val_columns(std::move(val_columns)) {}
+ , val_columns(std::move(val_columns))
+ , collocated_columns(std::move(collocated_columns)) {}
/**
* Get column by index.
@@ -118,30 +125,52 @@ struct schema {
* @return A new schema instance.
*/
static std::shared_ptr<schema> create_instance(std::int32_t version,
std::vector<column> &&cols) {
- std::int32_t key_columns_cnt = 0;
+ std::size_t key_columns_cnt = 0;
+ std::size_t collocated_columns_cnt = 0;
for (const auto &column : cols) {
if (column.is_key())
++key_columns_cnt;
+
+ if (column.colocation_index >= 0)
+ ++collocated_columns_cnt;
}
- std::int32_t val_columns_cnt = std::int32_t(cols.size()) -
key_columns_cnt;
+
+ const size_t val_columns_cnt = cols.size() - key_columns_cnt;
std::vector<const column *> key_columns(key_columns_cnt, nullptr);
std::vector<const column *> val_columns;
val_columns.reserve(val_columns_cnt);
+ assert(collocated_columns_cnt <= key_columns_cnt);
+
+ std::vector<const column *> collocated_columns(collocated_columns_cnt,
nullptr);
+
for (const auto &column : cols) {
if (column.is_key()) {
- assert(column.key_index >= 0 && std::size_t(column.key_index)
< key_columns.size());
+ assert(column.key_index >= 0 &&
static_cast<std::size_t>(column.key_index) < key_columns.size());
assert(key_columns[column.key_index] == nullptr);
key_columns[column.key_index] = &column;
} else {
val_columns.push_back(&column);
}
+
+ if (column.colocation_index >= 0) {
+ assert(static_cast<std::size_t>(column.colocation_index) <
collocated_columns.size());
+ assert(collocated_columns[column.colocation_index] == nullptr);
+
+ collocated_columns[column.colocation_index] = &column;
+ }
}
- return std::make_shared<schema>(version, std::move(cols),
std::move(key_columns), std::move(val_columns));
+ return std::make_shared<schema>(
+ version,
+ std::move(cols),
+ std::move(key_columns),
+ std::move(val_columns),
+ std::move(collocated_columns)
+ );
}
/**
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
index 19c5059c659..d0b35daf66f 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
@@ -17,6 +17,7 @@
#include "table_impl.h"
+#include "detail/hash_calculator.h"
#include "ignite/client/detail/transaction/transaction_impl.h"
#include "ignite/client/detail/utils.h"
#include "ignite/client/table/table.h"
@@ -149,6 +150,8 @@ void table_impl::load_schema_async(
void table_impl::get_async(
transaction *tx, const ignite_tuple &key,
ignite_callback<std::optional<ignite_tuple>> callback) {
+ update_partition_assignment();
+
with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
[self = shared_from_this(), key = std::make_shared<ignite_tuple>(key),
tx0 = to_impl(tx)](
const schema &sch, auto callback) mutable {
@@ -165,13 +168,22 @@ void table_impl::get_async(
callback(read_tuple_opt(reader, &sch));
});
+ auto preferred_node = self->get_preferred_node_name(*key, sch);
+
self->m_connection->perform_request_raw(
- protocol::client_operation::TUPLE_GET, tx0.get(), writer_func,
std::move(handle_func));
+ protocol::client_operation::TUPLE_GET,
+ tx0.get(),
+ writer_func,
+ std::move(handle_func),
+ preferred_node
+ );
});
}
void table_impl::contains_async(transaction *tx, const ignite_tuple &key,
ignite_callback<bool> callback) {
+ update_partition_assignment();
+
with_proper_schema_async<bool>(std::move(callback),
[self = shared_from_this(), key = std::make_shared<ignite_tuple>(key),
tx0 = to_impl(tx)](
const schema &sch, auto callback) mutable {
@@ -186,8 +198,16 @@ void table_impl::contains_async(transaction *tx, const
ignite_tuple &key, ignite
return reader.read_bool();
};
-
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_CONTAINS_KEY,
tx0.get(),
- writer_func, std::move(reader_func), std::move(callback));
+ auto preferred_node = self->get_preferred_node_name(*key, sch);
+
+ self->m_connection->perform_request<bool>(
+ protocol::client_operation::TUPLE_CONTAINS_KEY,
+ tx0.get(),
+ writer_func,
+ std::move(reader_func),
+ std::move(callback),
+ preferred_node
+ );
});
}
@@ -213,6 +233,9 @@ void table_impl::get_all_async(transaction *tx,
std::vector<ignite_tuple> keys,
}
void table_impl::upsert_async(transaction *tx, const ignite_tuple &record,
ignite_callback<void> callback) {
+
+ update_partition_assignment();
+
with_proper_schema_async<void>(std::move(callback),
[self = shared_from_this(), record = ignite_tuple(record), tx0 =
to_impl(tx)](
const schema &sch, auto callback) mutable {
@@ -221,8 +244,15 @@ void table_impl::upsert_async(transaction *tx, const
ignite_tuple &record, ignit
write_tuple(writer, sch, record, false);
};
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
self->m_connection->perform_request_wr(
- protocol::client_operation::TUPLE_UPSERT, tx0.get(),
writer_func, std::move(callback));
+ protocol::client_operation::TUPLE_UPSERT,
+ tx0.get(),
+ writer_func,
+ std::move(callback),
+ preferred_node
+ );
});
}
@@ -244,12 +274,14 @@ void table_impl::upsert_all_async(transaction *tx,
std::vector<ignite_tuple> rec
void table_impl::get_and_upsert_async(
transaction *tx, const ignite_tuple &record,
ignite_callback<std::optional<ignite_tuple>> callback) {
+ update_partition_assignment();
+
with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
- [self = shared_from_this(), record =
std::make_shared<ignite_tuple>(record), tx0 = to_impl(tx)](
+ [self = shared_from_this(), record, tx0 = to_impl(tx)](
const schema &sch, auto callback) {
auto writer_func = [self, record, &sch, &tx0](protocol::writer
&writer, auto&) {
write_table_operation_header(writer, self->m_id, tx0.get(),
sch);
- write_tuple(writer, sch, *record, false);
+ write_tuple(writer, sch, record, false);
};
auto handle_func =
make_schema_handler_function<std::optional<ignite_tuple>>(
@@ -257,12 +289,22 @@ void table_impl::get_and_upsert_async(
callback(read_tuple_opt(reader, &sch));
});
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
self->m_connection->perform_request_raw(
- protocol::client_operation::TUPLE_GET_AND_UPSERT, tx0.get(),
writer_func, std::move(handle_func));
+ protocol::client_operation::TUPLE_GET_AND_UPSERT,
+ tx0.get(),
+ writer_func,
+ std::move(handle_func),
+ preferred_node
+ );
});
}
void table_impl::insert_async(transaction *tx, const ignite_tuple &record,
ignite_callback<bool> callback) {
+
+ update_partition_assignment();
+
with_proper_schema_async<bool>(std::move(callback),
[self = shared_from_this(), record = ignite_tuple(record), tx0 =
to_impl(tx)](
const schema &sch, auto callback) mutable {
@@ -277,8 +319,16 @@ void table_impl::insert_async(transaction *tx, const
ignite_tuple &record, ignit
return reader.read_bool();
};
-
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_INSERT,
tx0.get(), writer_func,
- std::move(reader_func), std::move(callback));
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
+ self->m_connection->perform_request<bool>(
+ protocol::client_operation::TUPLE_INSERT,
+ tx0.get(),
+ writer_func,
+ std::move(reader_func),
+ std::move(callback),
+ preferred_node
+ );
});
}
@@ -305,10 +355,13 @@ void table_impl::insert_all_async(
}
void table_impl::replace_async(transaction *tx, const ignite_tuple &record,
ignite_callback<bool> callback) {
+
+ update_partition_assignment();
+
with_proper_schema_async<bool>(std::move(callback),
[self = shared_from_this(), record = ignite_tuple(record), tx0 =
to_impl(tx)](
const schema &sch, auto callback) mutable {
- auto writer_func = [self, &record, &sch, &tx0](protocol::writer
&writer, auto&) {
+ auto writer_func = [self, &record, &sch, &tx0](protocol::writer
&writer, auto &) {
write_table_operation_header(writer, self->m_id, tx0.get(),
sch);
write_tuple(writer, sch, record, false);
};
@@ -319,13 +372,24 @@ void table_impl::replace_async(transaction *tx, const
ignite_tuple &record, igni
return reader.read_bool();
};
-
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_REPLACE,
tx0.get(), writer_func,
- std::move(reader_func), std::move(callback));
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
+ self->m_connection->perform_request<bool>(
+ protocol::client_operation::TUPLE_REPLACE,
+ tx0.get(),
+ writer_func,
+ std::move(reader_func),
+ std::move(callback),
+ preferred_node
+ );
});
}
void table_impl::replace_async(
transaction *tx, const ignite_tuple &record, const ignite_tuple
&new_record, ignite_callback<bool> callback) {
+
+ update_partition_assignment();
+
with_proper_schema_async<bool>(std::move(callback),
[self = shared_from_this(), record = ignite_tuple(record), new_record
= ignite_tuple(new_record),
tx0 = to_impl(tx)](const schema &sch, auto callback) mutable {
@@ -341,20 +405,30 @@ void table_impl::replace_async(
return reader.read_bool();
};
-
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_REPLACE_EXACT,
tx0.get(),
- writer_func, std::move(reader_func), std::move(callback));
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
+ self->m_connection->perform_request<bool>(
+ protocol::client_operation::TUPLE_REPLACE_EXACT,
+ tx0.get(),
+ writer_func,
+ std::move(reader_func),
+ std::move(callback),
+ preferred_node
+ );
});
}
void table_impl::get_and_replace_async(
transaction *tx, const ignite_tuple &record,
ignite_callback<std::optional<ignite_tuple>> callback) {
+ update_partition_assignment();
+
with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
- [self = shared_from_this(), record =
std::make_shared<ignite_tuple>(record), tx0 = to_impl(tx)](
+ [self = shared_from_this(), record, tx0 = to_impl(tx)](
const schema &sch, auto callback) mutable {
auto writer_func = [self, record, &sch, &tx0](protocol::writer
&writer, auto&) {
write_table_operation_header(writer, self->m_id, tx0.get(),
sch);
- write_tuple(writer, sch, *record, false);
+ write_tuple(writer, sch, record, false);
};
auto handle_func =
make_schema_handler_function<std::optional<ignite_tuple>>(
@@ -362,12 +436,22 @@ void table_impl::get_and_replace_async(
callback(read_tuple_opt(reader, &sch));
});
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
self->m_connection->perform_request_raw(
- protocol::client_operation::TUPLE_GET_AND_REPLACE, tx0.get(),
writer_func, std::move(handle_func));
+ protocol::client_operation::TUPLE_GET_AND_REPLACE,
+ tx0.get(),
+ writer_func,
+ std::move(handle_func),
+ preferred_node
+ );
});
}
void table_impl::remove_async(transaction *tx, const ignite_tuple &key,
ignite_callback<bool> callback) {
+
+ update_partition_assignment();
+
with_proper_schema_async<bool>(std::move(callback),
[self = shared_from_this(), record = ignite_tuple(key), tx0 =
to_impl(tx)](
const schema &sch, auto callback) mutable {
@@ -382,12 +466,23 @@ void table_impl::remove_async(transaction *tx, const
ignite_tuple &key, ignite_c
return reader.read_bool();
};
-
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_DELETE,
tx0.get(), writer_func,
- std::move(reader_func), std::move(callback));
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
+ self->m_connection->perform_request<bool>(
+ protocol::client_operation::TUPLE_DELETE,
+ tx0.get(),
+ writer_func,
+ std::move(reader_func),
+ std::move(callback),
+ preferred_node
+ );
});
}
void table_impl::remove_exact_async(transaction *tx, const ignite_tuple
&record, ignite_callback<bool> callback) {
+
+ update_partition_assignment();
+
with_proper_schema_async<bool>(std::move(callback),
[self = shared_from_this(), record = ignite_tuple(record), tx0 =
to_impl(tx)](
const schema &sch, auto callback) mutable {
@@ -402,20 +497,30 @@ void table_impl::remove_exact_async(transaction *tx,
const ignite_tuple &record,
return reader.read_bool();
};
-
self->m_connection->perform_request<bool>(protocol::client_operation::TUPLE_DELETE_EXACT,
tx0.get(),
- writer_func, std::move(reader_func), std::move(callback));
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
+ self->m_connection->perform_request<bool>(
+ protocol::client_operation::TUPLE_DELETE_EXACT,
+ tx0.get(),
+ writer_func,
+ std::move(reader_func),
+ std::move(callback),
+ preferred_node
+ );
});
}
void table_impl::get_and_remove_async(
transaction *tx, const ignite_tuple &key,
ignite_callback<std::optional<ignite_tuple>> callback) {
+ update_partition_assignment();
+
with_proper_schema_async<std::optional<ignite_tuple>>(std::move(callback),
- [self = shared_from_this(), record =
std::make_shared<ignite_tuple>(key), tx0 = to_impl(tx)](
+ [self = shared_from_this(), record = key, tx0 = to_impl(tx)](
const schema &sch, auto callback) mutable {
auto writer_func = [self, record, &sch, &tx0](protocol::writer
&writer, auto&) {
write_table_operation_header(writer, self->m_id, tx0.get(),
sch);
- write_tuple(writer, sch, *record, true);
+ write_tuple(writer, sch, record, true);
};
auto handle_func =
make_schema_handler_function<std::optional<ignite_tuple>>(
@@ -423,8 +528,15 @@ void table_impl::get_and_remove_async(
callback(read_tuple_opt(reader, &sch));
});
+ auto preferred_node = self->get_preferred_node_name(record, sch);
+
self->m_connection->perform_request_raw(
- protocol::client_operation::TUPLE_GET_AND_DELETE, tx0.get(),
writer_func, std::move(handle_func));
+ protocol::client_operation::TUPLE_GET_AND_DELETE,
+ tx0.get(),
+ writer_func,
+ std::move(handle_func),
+ preferred_node
+ );
});
}
@@ -472,4 +584,68 @@ std::shared_ptr<table_impl> table_impl::from_facade(table
&tb) {
return tb.m_impl;
}
+void table_impl::update_partition_assignment() {
+ ignite_callback<std::shared_ptr<protocol::partition_assignment>> callback
= [self=shared_from_this()](auto pa) {
+ if (pa.has_error()) {
+ self->m_connection->get_logger()->log_warning("Error while
updating partition assignment for table '"
+ + self->get_name() + "': " + pa.error().what_str());
+
+ return;
+ }
+
+ std::lock_guard lock(self->m_partitions_mutex);
+ self->m_partition_assignment = std::move(pa).value();
+ };
+
+ load_partition_assignment_async(std::move(callback));
+}
+
+void
table_impl::load_partition_assignment_async(ignite_callback<std::shared_ptr<protocol::partition_assignment>>
callback) {
+ std::int64_t timestamp = m_connection->get_assignment_timestamp();
+
+ auto pa = get_partition_assignment();
+ if (pa && !pa->is_outdated(timestamp)) {
+ m_connection->get_logger()->log_debug("Partition assignment for table
" + get_name() + " is up to date.");
+
+ callback(std::move(pa));
+ return;
+ }
+
+ auto writer_func = [id = m_id, timestamp](protocol::writer &writer, auto&)
{
+ protocol::write_partition_assignment_request(writer, id, timestamp);
+ };
+
+ auto reader_func = [timestamp](protocol::reader &reader) ->
std::shared_ptr<protocol::partition_assignment> {
+ return protocol::read_partition_assignment_response(reader, timestamp);
+ };
+
+
m_connection->perform_request<std::shared_ptr<protocol::partition_assignment>>(
+ protocol::client_operation::PARTITION_ASSIGNMENT_GET,
+ nullptr,
+ writer_func,
+ std::move(reader_func),
+ std::move(callback));
+}
+
+std::optional<std::string> table_impl::get_preferred_node_name(const
ignite_tuple &key_or_rec, const schema &sch) {
+ auto pa = get_partition_assignment();
+
+ if (!pa || pa->get_partitions().empty()) {
+ m_connection->get_logger()->log_debug("No partition distribution
available, a random node will be called");
+ return {};
+ }
+
+ hash_calculator hc;
+ for (auto column : sch.collocated_columns) {
+ const auto& val = key_or_rec.get(column->key_index);
+ hc.append(val, column->scale, column->precision);
+ }
+
+ auto hash = hc.result_hash();
+
+ auto part_id = std::abs(hash %
static_cast<int32_t>(pa->get_partitions().size()));
+
+ return pa->get_partitions()[part_id];
+}
+
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
index f99adca4640..47a84280553 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
@@ -23,6 +23,8 @@
#include "ignite/client/table/qualified_name.h"
#include "ignite/client/transaction/transaction.h"
+#include "ignite/protocol/partition_assignment.h"
+
#include <memory>
#include <mutex>
#include <unordered_map>
@@ -364,6 +366,7 @@ public:
* @return Implementation.
*/
[[nodiscard]] static std::shared_ptr<table_impl> from_facade(table &tb);
+ void update_partition_assignment();
/**
* Get table ID.
@@ -409,6 +412,32 @@ private:
m_schemas[val->version] = val;
}
+ /**
+ * Get partition assignment.
+ *
+ * @return Partition assignment.
+ */
+ std::shared_ptr<protocol::partition_assignment> get_partition_assignment()
{
+ std::lock_guard lock(m_partitions_mutex);
+ auto assignment = m_partition_assignment;
+ return assignment;
+ }
+
+ /**
+ * Load partition assignment.
+ *
+ * @param callback Callback to call with the actual assignment.
+ */
+ void
load_partition_assignment_async(ignite_callback<std::shared_ptr<protocol::partition_assignment>>
callback);
+
+ /**
+ * Returns name of node which should contain primary replica for partition
record is belongs to.
+ * @param key_or_rec Key part of record or whole record.
+ * @param sch Schema
+ * @return Node name with replica or @c std::nullopt.
+ */
+ std::optional<std::string> get_preferred_node_name(const ignite_tuple
&key_or_rec, const schema &sch);
+
/**
* Get impl of transaction.
* @param tx Transaction.
@@ -433,6 +462,12 @@ private:
/** Schemas. */
std::unordered_map<int32_t, std::shared_ptr<schema>> m_schemas;
+
+ /** Partitions mutex. */
+ std::recursive_mutex m_partitions_mutex;
+
+ /** Partition assignment. */
+ std::shared_ptr<protocol::partition_assignment> m_partition_assignment;
};
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/common/uuid.cpp
b/modules/platforms/cpp/ignite/common/uuid.cpp
index 22bd65d0fa8..5038dbf37e3 100644
--- a/modules/platforms/cpp/ignite/common/uuid.cpp
+++ b/modules/platforms/cpp/ignite/common/uuid.cpp
@@ -22,6 +22,58 @@
namespace ignite {
+std::optional<uuid> uuid::from_string(const std::string &text) {
+ if (text.length() != 36) {
+ return {};
+ }
+
+ auto str = text.c_str();
+
+ auto parse_chunk = [str](size_t beg, size_t end, uint64_t &out) -> bool {
+ char *p;
+
+ if (errno != 0) {
+ errno = 0;
+ }
+
+ out = std::strtoull(str + beg, &p, 16);
+
+ if (p != str + end || (*p != '-' && *p != '\0') || errno == ERANGE) {
+ return false;
+ }
+
+ return true;
+ };
+
+ std::uint64_t msb1, msb2, msb3;
+ std::uint64_t lsb1, lsb2;
+
+ if (!parse_chunk(0, 8, msb1)) {
+ return {};
+ }
+
+ if (!parse_chunk(9, 13, msb2)) {
+ return {};
+ }
+
+ if (!parse_chunk(14, 18, msb3)) {
+ return {};
+ }
+
+ if (!parse_chunk(19, 23, lsb1)) {
+ return {};
+ }
+
+ if (!parse_chunk(24, 36, lsb2)) {
+ return {};
+ }
+
+ uint64_t msb = msb1 << 32 | msb2 << 16 | msb3;
+ uint64_t lsb = lsb1 << 48 | lsb2;
+
+ return uuid{static_cast<int64_t>(msb), static_cast<int64_t>(lsb)};
+}
+
uuid uuid::random() {
static std::mutex random_mutex;
static std::random_device rd;
diff --git a/modules/platforms/cpp/ignite/common/uuid.h
b/modules/platforms/cpp/ignite/common/uuid.h
index abb01a874ec..b81710bb858 100644
--- a/modules/platforms/cpp/ignite/common/uuid.h
+++ b/modules/platforms/cpp/ignite/common/uuid.h
@@ -17,10 +17,14 @@
#pragma once
+#include <cerrno>
#include <cstdint>
+#include <cstdlib>
#include <iomanip>
#include <istream>
+#include <optional>
#include <ostream>
+#include <string>
namespace ignite {
@@ -46,6 +50,15 @@ public:
: most(most)
, least(least) {}
+ /**
+ * Parses string-encoded uuid.
+ *
+ * Example of possible input value: ff979603-fb56-49e9-bc79-7c4487bbbafd.
+ * @param text String-encoded uuid.
+ * @return @c uuid object if parsing was successful, otherwise empty
optional.
+ */
+ static std::optional<uuid> from_string(const std::string& text);
+
/**
* Make random UUID.
*
diff --git a/modules/platforms/cpp/ignite/common/uuid_test.cpp
b/modules/platforms/cpp/ignite/common/uuid_test.cpp
index 5770daccc62..25017c59954 100644
--- a/modules/platforms/cpp/ignite/common/uuid_test.cpp
+++ b/modules/platforms/cpp/ignite/common/uuid_test.cpp
@@ -64,3 +64,81 @@ TEST(uuid, stream) {
EXPECT_EQ(uuidString, uuidString2);
}
+
+TEST(uuid, incorrect_string_representation) {
+ auto uuidText = "foo";
+
+ auto resOpt = ignite::uuid::from_string(uuidText);
+
+ EXPECT_FALSE(resOpt.has_value());
+}
+
+class uuid_string_presentation_fixture: public
::testing::TestWithParam<std::tuple<std::string, ignite::uuid>> {};
+
+TEST_P(uuid_string_presentation_fixture, from_string) {
+ auto [uuidText, uuid] = GetParam();
+
+ auto resOpt = ignite::uuid::from_string(uuidText);
+
+ ASSERT_TRUE(resOpt.has_value());
+
+ EXPECT_EQ(uuid.get_most_significant_bits(),
resOpt->get_most_significant_bits());
+ EXPECT_EQ(uuid.get_least_significant_bits(),
resOpt->get_least_significant_bits());
+}
+
+// We check that two representation are consistent both ways.
+TEST_P(uuid_string_presentation_fixture, to_string) {
+ auto [uuidText, uuid] = GetParam();
+
+ std::stringstream ss;
+ ss << uuid;
+
+ EXPECT_EQ(uuidText, ss.str());
+}
+
+TEST_P(uuid_string_presentation_fixture, circular_convertation0) {
+ auto [uuidText, uuid] = GetParam();
+
+ std::stringstream ss;
+ ss << uuid;
+
+ auto convertedText = ss.str();
+
+ auto converterUuid = ignite::uuid::from_string(convertedText);
+
+ EXPECT_EQ(converterUuid, uuid);
+}
+
+TEST_P(uuid_string_presentation_fixture, circular_convertation1) {
+ auto [uuidText, uuid] = GetParam();
+
+ auto converterUuid = ignite::uuid::from_string(uuidText);
+
+ ASSERT_TRUE(converterUuid.has_value());
+
+ std::stringstream ss;
+ ss << *converterUuid;
+
+ auto convertedText = ss.str();
+
+ EXPECT_EQ(convertedText, uuidText);
+}
+
+// Values has taken randomly from java program, as this implementation should
have same behaviour as java.lang.UUID for compatibility reasons.
+INSTANTIATE_TEST_SUITE_P(
+ various_uuids,
+ uuid_string_presentation_fixture,
+ ::testing::Values(
+ std::tuple{"4b62e46a-d380-460f-94ea-9b4320752634",
ignite::uuid{5432155248028304911LL, -7716184298936261068LL}},
+ std::tuple{"cabf7114-7763-4ad5-b312-5ffadcbf48c8",
ignite::uuid{-3837224024780092715, -5543262660289673016}},
+ std::tuple{"8ee62c79-5636-44b9-adc4-c9b4272a2ba6",
ignite::uuid{-8149777576031271751, -5925389434124358746}},
+ std::tuple{"3e69a946-fc2a-4da1-9938-d1f082dd36ce",
ignite::uuid{4497311825249586593, -7405938756292888882}},
+ std::tuple{"7cc9f130-821b-4580-bfda-955072540643",
ignite::uuid{8991983321665455488, -4622217894794361277}},
+ std::tuple{"1f86b0b1-dfab-4108-8b7e-8b3e4177ceb0",
ignite::uuid{2271697340063236360, -8395119555869421904}},
+ std::tuple{"88cc69d9-d191-482d-b295-aa458cf99167",
ignite::uuid{-8589374005057599443, -5578365347733859993}},
+ std::tuple{"a8b6298c-126e-4c4d-a521-de3989a31492",
ignite::uuid{-6289794147994940339, -6547708044516322158}},
+ std::tuple{"03b210a8-5a6a-4cae-b4fc-f30df9f1eebd",
ignite::uuid{266293643225746606, -5405178211397931331}},
+ std::tuple{"f145e277-bb58-4f89-adfa-3166a975b71b",
ignite::uuid{-1061193133303771255, -5910357243970865381}},
+ std::tuple{"dc8b5857-067f-486e-aa5b-52cbdb26dd9c",
ignite::uuid{-2554851232808220562, -6171247828872536676}}
+ )
+);
\ No newline at end of file
diff --git a/modules/platforms/cpp/ignite/network/length_prefix_codec.h
b/modules/platforms/cpp/ignite/network/length_prefix_codec.h
index 775addf5031..3536b21d69f 100644
--- a/modules/platforms/cpp/ignite/network/length_prefix_codec.h
+++ b/modules/platforms/cpp/ignite/network/length_prefix_codec.h
@@ -19,6 +19,7 @@
#include <ignite/common/ignite_error.h>
#include <ignite/network/codec.h>
+#include <ignite/common/detail/factory.h>
#include <cstddef>
#include <vector>
@@ -83,6 +84,6 @@ private:
};
/** Factory for length_prefix_codec. */
-typedef detail::basic_factory<codec, length_prefix_codec>
length_prefix_codec_factory;
+typedef ignite::detail::basic_factory<codec, length_prefix_codec>
length_prefix_codec_factory;
} // namespace ignite::network
diff --git a/modules/platforms/cpp/ignite/protocol/client_operation.h
b/modules/platforms/cpp/ignite/protocol/client_operation.h
index d54974d91a3..e1b0cba5fa4 100644
--- a/modules/platforms/cpp/ignite/protocol/client_operation.h
+++ b/modules/platforms/cpp/ignite/protocol/client_operation.h
@@ -119,6 +119,9 @@ enum class client_operation {
/** Close the cursor. */
SQL_CURSOR_CLOSE = 52,
+ /** Get partition assignment. */
+ PARTITION_ASSIGNMENT_GET = 53,
+
/** Execute SQL script. */
SQL_EXEC_SCRIPT = 56,
diff --git a/modules/platforms/cpp/ignite/protocol/messages.cpp
b/modules/platforms/cpp/ignite/protocol/messages.cpp
index 41857115d3a..c9442ec321c 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.cpp
+++ b/modules/platforms/cpp/ignite/protocol/messages.cpp
@@ -64,8 +64,8 @@ handshake_response parse_handshake_response(bytes_view
message) {
return res;
res.idle_timeout_ms = reader.read_int64();
- reader.skip(); // Cluster node ID. Needed for partition-aware compute.
- UNUSED_VALUE reader.read_string_nullable(); // Cluster node name. Needed
for partition-aware compute.
+ res.node_id = reader.read_uuid();
+ res.node_name = reader.read_string();
auto cluster_ids_len = reader.read_int32();
if (cluster_ids_len <= 0) {
@@ -100,4 +100,39 @@ handshake_response parse_handshake_response(bytes_view
message) {
return res;
}
+void write_partition_assignment_request(writer &writer, std::int32_t table_id,
std::int64_t timestamp) {
+ writer.write(table_id);
+ writer.write(timestamp);
+}
+
+std::shared_ptr<partition_assignment>
read_partition_assignment_response(reader &reader, std::int64_t timestamp) {
+ auto cnt = reader.read_int32();
+ if (cnt <= 0)
+ throw ignite_error("Invalid partition count: " + std::to_string(cnt));
+
+ std::vector<std::optional<std::string>> partitions;
+ partitions.reserve(cnt);
+
+ bool assignment_available = reader.read_bool();
+ if (!assignment_available) {
+ // Invalidate the current assignment so that we can retry on the next
call.
+ // Return an empty array so that per-partition batches can be
initialized.
+ // We'll get the actual assignment on the next call.
+ partitions.insert(partitions.end(), cnt, std::nullopt);
+ return std::make_shared<partition_assignment>(0,
std::move(partitions));
+ }
+
+ // Returned timestamp can be newer than requested.
+ std::int64_t ts = reader.read_int64();
+ if (ts < timestamp)
+ throw ignite_error("Returned timestamp is older than requested: " +
std::to_string(ts) + " < "
+ + std::to_string(timestamp));
+
+ for (std::int32_t i = 0; i < cnt; ++i) {
+ partitions.emplace_back(reader.read_string_nullable());
+ }
+
+ return std::make_shared<partition_assignment>(ts, std::move(partitions));
+}
+
} // namespace ignite::protocol
diff --git a/modules/platforms/cpp/ignite/protocol/messages.h
b/modules/platforms/cpp/ignite/protocol/messages.h
index 0a4829f941c..bb1df42785c 100644
--- a/modules/platforms/cpp/ignite/protocol/messages.h
+++ b/modules/platforms/cpp/ignite/protocol/messages.h
@@ -19,6 +19,9 @@
#include "ignite/protocol/protocol_context.h"
#include "ignite/protocol/protocol_version.h"
+#include "ignite/protocol/writer.h"
+#include "ignite/protocol/reader.h"
+#include "ignite/protocol/partition_assignment.h"
#include "ignite/common/bytes_view.h"
#include "ignite/common/ignite_error.h"
@@ -66,10 +69,16 @@ struct handshake_response {
protocol_context context{};
/** Observable timestamp. */
- std::int64_t observable_timestamp;
+ std::int64_t observable_timestamp{0};
/** Server idle timeout in ms. */
- std::int64_t idle_timeout_ms;
+ std::int64_t idle_timeout_ms{0};
+
+ /** Node id. */
+ uuid node_id{};
+
+ /** Node name. */
+ std::string node_name;
};
/**
@@ -91,4 +100,22 @@ std::vector<std::byte> make_handshake_request(
*/
handshake_response parse_handshake_response(bytes_view message);
+/**
+ * Write partition assignment request.
+ *
+ * @param writer Writer.
+ * @param table_id Table ID.
+ * @param timestamp Timestamp.
+ */
+void write_partition_assignment_request(writer &writer, std::int32_t table_id,
std::int64_t timestamp);
+
+/**
+ * Read assignment response.
+ *
+ * @param reader Reader.
+ * @param timestamp Timestamp.
+ * @return A new assignment.
+ */
+std::shared_ptr<partition_assignment>
read_partition_assignment_response(reader &reader, std::int64_t timestamp);
+
} // namespace ignite::protocol
diff --git a/modules/platforms/cpp/ignite/protocol/partition_assignment.h
b/modules/platforms/cpp/ignite/protocol/partition_assignment.h
new file mode 100644
index 00000000000..a35d034b573
--- /dev/null
+++ b/modules/platforms/cpp/ignite/protocol/partition_assignment.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <optional>
+#include <string>
+#include <vector>
+#include <cstdint>
+
+namespace ignite::protocol {
+
+/**
+ * Partition assignment.
+ */
+class partition_assignment {
+public:
+ partition_assignment() = default;
+
+ /**
+ * Constructor.
+ *
+ * @param timestamp Partition assignment data.
+ * @param partitions Partition assignment encode into vector.
+ */
+ partition_assignment(std::int64_t timestamp,
std::vector<std::optional<std::string>> partitions)
+ : m_timestamp(timestamp)
+ , m_partitions(std::move(partitions)) {}
+
+ /**
+ * Check whether the assignment is outdated.
+ *
+ * @param actual_timestamp Timestamp.
+ * @return @c true if assignment is outdated.
+ */
+ [[nodiscard]] bool is_outdated(std::int64_t actual_timestamp) const {
return m_timestamp < actual_timestamp; }
+
+ /**
+ * Partitions. Vector is decoded as following: size of collection is total
number of partitions,
+ * i-th value means that partition with id == i resides on (primary
replica of that partition belongs to)
+ * node with provided consisted id (AKA node name).
+ *
+ * @return Vector with partition assignments.
+ */
+ [[nodiscard]] const std::vector<std::optional<std::string>>&
get_partitions() const {
+ return m_partitions;
+ }
+private:
+ /** Assignment timestamp. */
+ std::int64_t m_timestamp{0};
+
+ /**
+ * Partitions. Vector is decoded as following: size of collection is total
number of partitions,
+ * i-th value means that partition with id == i resides on (primary replica
of that partition belongs to)
+ * node with provided consisted id (AKA node name).
+ */
+ std::vector<std::optional<std::string>> m_partitions;
+};
+
+} // namespace ignite::protocol
diff --git a/modules/platforms/cpp/tests/client-test/CMakeLists.txt
b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
index 715c784bd80..3810b5442b3 100644
--- a/modules/platforms/cpp/tests/client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
@@ -33,6 +33,7 @@ set(SOURCES
ssl_test.cpp
tables_test.cpp
transactions_test.cpp
+ partition_awareness_test.cpp
)
ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common
ignite3-client)
diff --git a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
index edc97733394..e09ffeb80c9 100644
--- a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
+++ b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
@@ -57,9 +57,11 @@ public:
inline static const std::string ERROR_JOB = JOBS + "$IgniteExceptionJob";
inline static const std::string ECHO_JOB = JOBS + "$EchoJob";
inline static const std::string RETURN_NULL_JOB = JOBS + "$ReturnNullJob";
+ inline static const std::string GET_PART_DISTRIBUTION_JOB = JOBS +
"$GetPartitionDistributionByTableCppJob";
static constexpr const char *KEY_COLUMN = "KEY";
static constexpr const char *VAL_COLUMN = "VAL";
+ static constexpr const char *PART_PSEUDOCOLUMN = "__partition_id";
/**
* Get logger.
@@ -68,6 +70,26 @@ public:
*/
static std::shared_ptr<gtest_logger> get_logger() { return
std::make_shared<gtest_logger>(false, true); }
+ /**
+ * Get tuple for specified column names and values.
+ *
+ * @param col_names Array with column names, should be same size as @c
col_values.
+ * @param col_values Array with column values, should be same size as @c
col_names.
+ * @return Ignite tuple with provided values and columns.
+ */
+ static ignite_tuple get_tuple(const std::vector<std::string>& col_names,
const std::vector<primitive>& col_values) {
+ assert(col_values.size() == col_names.size());
+ assert(!col_values.empty());
+
+ ignite_tuple res;
+
+ for (size_t i = 0; i < col_names.size(); ++i) {
+ res.set(col_names[i], col_values[i]);
+ }
+
+ return res;
+ }
+
/**
* Get tuple for specified column values.
*
diff --git
a/modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp
b/modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp
new file mode 100644
index 00000000000..e789b2d0da2
--- /dev/null
+++ b/modules/platforms/cpp/tests/client-test/partition_awareness_test.cpp
@@ -0,0 +1,737 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "detail/string_extensions.h"
+#include "ignite_runner_suite.h"
+#include "tests/test-common/test_utils.h"
+
+#include "ignite/client/detail/utils.h"
+#include "ignite/client/ignite_client.h"
+#include "ignite/client/ignite_client_configuration.h"
+
+#include "proxy/asio_proxy.h"
+#include "proxy/message_listener.h"
+
+#include <gtest/gtest.h>
+
+namespace ignite {
+
+using namespace std::chrono_literals;
+
+using part_id_type = std::size_t;
+using id_type = std::int64_t;
+
+constexpr id_type MIN_ID = 0;
+constexpr id_type MAX_ID = 1000;
+
+static const std::map<std::string, std::string> NODES_TO_ADDR = {
+ {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner",
"127.0.0.1:10942"},
+ {"org.apache.ignite.internal.runner.app.PlatformTestNodeRunner_2",
"127.0.0.1:10943"}
+};
+
+/**
+ * Parses string-encoded partition distribution.
+ * Example of string:
+ *
ff979603-fb56-49e9-bc79-7c4487bbbafd=[4];2e14bb82-5cb5-4283-aafd-f2d8552a842a=[5,
8, 9];
+ * @param encoded String-encoded distribution
+ * @return Conveniently structured distribution.
+ */
+std::map<int64_t, uuid> parse_partition_distribution(const std::string&
encoded) {
+ std::map<int64_t, uuid> res;
+
+ size_t lhs = 0;
+ size_t rhs = std::string::npos;
+ while ((rhs = encoded.find(';', lhs)) != std::string::npos) {
+ std::string chunk = encoded.substr(lhs, rhs - lhs);
+
+ auto eq_pos = chunk.find('=');
+
+ if (eq_pos == std::string::npos) {
+ throw std::runtime_error("Incorrect partition distribution text:"
+ encoded);
+ }
+
+ auto node_id = uuid::from_string(chunk.substr(0, eq_pos));
+
+ if (!node_id.has_value()) {
+ throw std::runtime_error("can't parse partition distribution,
incorrect input:" + encoded);
+ }
+
+ std::string num_list = chunk.substr(eq_pos + 1); // e.g. [1,2,3]
+
+ size_t lo = 1;
+ size_t hi = std::string::npos;
+ while ((hi = num_list.find_first_of(",]", lo)) != std::string::npos) {
+ int64_t part_id = std::stoll(num_list.substr(lo, hi - lo));
+ res[part_id] = *node_id;
+ lo = hi + 1;
+ }
+ lhs = rhs + 1;
+ }
+
+ return res;
+}
+
+template<typename T>
+class partition_awareness_test : public ignite_runner_suite {
+private:
+ T tab_info{};
+protected:
+ void SetUp() override {
+ ignite_client_configuration client_cfg;
+ client_cfg.set_endpoints(get_node_addrs());
+
+ m_direct_client = ignite_client::start(client_cfg, 5s);
+
+ auto nodes = m_direct_client.get_cluster_nodes();
+
+ for (auto& node : nodes) {
+ if (NODES_TO_ADDR.count(node.get_name())) {
+ m_endpoint_to_node_id[NODES_TO_ADDR.at(node.get_name())] =
node.get_id();
+ }
+ }
+
+ ASSERT_EQ(m_endpoint_to_node_id.size(), get_node_addrs().size())
+ << "Incorrect node names or address!"
+ << "This test should be able to connect to all non-ssl nodes";
+
+ drop_table_if_exists();
+
+ create_table();
+
+ collect_partition_distribution();
+
+ setup_proxy();
+ }
+
+ void TearDown() override {
+ drop_table_if_exists();
+ }
+
+ ignite_tuple key_tup(id_type key) {
+ return get_tuple(tab_info.key_column_names, tab_info.get_pk_vals(key));
+ }
+
+ ignite_tuple main_val_tup(id_type key) {
+ return get_tuple(tab_info.non_key_column_names,
tab_info.get_main_vals(key));
+ }
+
+ ignite_tuple alt_val_tup(id_type key) {
+ return get_tuple(tab_info.non_key_column_names,
tab_info.get_alt_vals(key));
+ }
+
+ ignite_tuple main_rec(id_type key) {
+ return get_tuple(tab_info.all_column_names,
tab_info.get_main_rec(key));
+ }
+
+ ignite_tuple alt_rec(id_type key) {
+ return get_tuple(tab_info.all_column_names, tab_info.get_alt_rec(key));
+ }
+
+ void create_table() {
+ m_direct_client.get_sql().execute(nullptr, nullptr,
{tab_info.create_table_ddl()}, {});
+ }
+
+ void drop_table_if_exists() {
+ m_direct_client.get_sql().execute(nullptr, nullptr, {"drop table if
exists " + tab_info.table_name}, {});
+ }
+
+ void collect_partition_distribution() {
+ std::map<id_type, int64_t> key_to_part;
+
+ populate_table();
+
+ std::stringstream sql;
+ sql << "select ID " << ", " << PART_PSEUDOCOLUMN << " from " <<
tab_info.table_name << " order by ID";
+
+ auto rs = m_direct_client.get_sql().execute(nullptr, nullptr,
{sql.str()}, {});
+ do {
+ auto page = rs.current_page();
+
+ for (auto &rec : page) {
+ auto key = rec.get("ID").get<id_type>();
+ auto part_id = rec.get(PART_PSEUDOCOLUMN).get<int64_t>();
+
+ key_to_part[key] = part_id;
+ }
+
+ if (rs.has_more_pages())
+ rs.fetch_next_page();
+ else
+ break;
+ } while (true);
+
+ auto job_exec =
m_direct_client.get_compute().submit(job_target::any_node(m_direct_client.get_cluster_nodes()),
+ job_descriptor::builder{GET_PART_DISTRIBUTION_JOB}.build(),
{tab_info.table_name});
+
+ auto res = job_exec.get_result()->get_primitive().template
get<std::string>();
+
+ auto part_to_node = parse_partition_distribution(res);
+
+ for (auto [key, part_id] : key_to_part) {
+ m_key_distribution[key] = part_to_node[part_id];
+ }
+
+ clear_table();
+ }
+
+ void populate_table() {
+ auto view =
m_direct_client.get_tables().get_table(tab_info.table_name)->get_key_value_binary_view();
+
+ for (int64_t key = MIN_ID; key < MAX_ID; ++key) {
+ view.put(nullptr, this->key_tup(key), this->main_val_tup(key));
+ }
+ }
+
+ void clear_table() {
+ std::stringstream sql;
+ sql << "delete from " << tab_info.table_name;
+
+ m_direct_client.get_sql().execute(nullptr, nullptr, {sql.str()}, {});
+ }
+
+ void setup_proxy() {
+ auto srv_endpoints = get_node_addrs();
+
+ std::vector<proxy::configuration> proxy_cfg;
+ proxy_cfg.reserve(srv_endpoints.size());
+
+ std::vector<std::string> proxy_endpoints;
+ proxy_endpoints.reserve(srv_endpoints.size());
+
+ std::uint16_t proxy_port = 50000;
+ for (auto& srv_endpoint: srv_endpoints) {
+ auto node_id = m_endpoint_to_node_id.at(srv_endpoint);
+
+ auto in_listener = std::make_shared<proxy::message_listener>();
+ auto out_listener = std::make_shared<proxy::message_listener>();
+ m_in_listeners[node_id] = in_listener;
+ m_out_listeners[node_id] = out_listener;
+
+ proxy_cfg.emplace_back(proxy_port, srv_endpoint, in_listener,
out_listener);
+ proxy_endpoints.push_back("127.0.0.1:" +
std::to_string(proxy_port));
+
+ proxy_port++;
+ }
+
+ proxy = std::make_unique<proxy::asio_proxy>(proxy_cfg, get_logger());
+
+ ignite_client_configuration client_cfg;
+ client_cfg.set_endpoints(proxy_endpoints);
+ client_cfg.set_logger(get_logger());
+
+ m_client = ignite_client::start(client_cfg, 5s);
+
+ m_kv_view =
m_client.get_tables().get_table(tab_info.table_name)->get_key_value_binary_view();
+ m_rec_view =
m_client.get_tables().get_table(tab_info.table_name)->get_record_binary_view();
+
+ {
+ // we initiate initial partition assignment load to reduce tests
flakiness on first records
+ auto res = m_kv_view.get(nullptr, this->key_tup(42));
+
+ auto start = std::chrono::steady_clock::now();
+
+ int total = 0;
+ while (total == 0) {
+ for (auto &[_, node_id] : m_endpoint_to_node_id) {
+ total += count_op_by_node_id(node_id,
protocol::client_operation::PARTITION_ASSIGNMENT_GET);
+ }
+
+ if (std::chrono::steady_clock::now() - start < 3s) {
+ std::this_thread::yield();
+ } else {
+ FAIL() << "Test didn't wait for partition assignment";
+ }
+ }
+ }
+ }
+
+ void toggle_message_registration(bool enable) {
+ for (auto &[_, listener] : m_in_listeners) {
+ listener->toggle_message_registration(enable);
+ }
+
+ for (auto &[_, listener] : m_out_listeners) {
+ listener->toggle_message_registration(enable);
+ }
+ }
+
+ /**
+ * Count how many operations was sent directly to desired node.
+ * @param node_id Node id.
+ * @return Number of operations.
+ */
+ int count_op_by_node_id(uuid node_id, protocol::client_operation op) {
+ auto in_listener = m_in_listeners.at(node_id);
+
+ int found = 0;
+ while (true) {
+ auto msgs = in_listener->get_next<proxy::client_message>();
+
+ if (msgs.empty()) {
+ break;
+ }
+
+ for (auto &msg : msgs) {
+ get_logger()->log_debug("Processed: node_id=" +
detail::to_string(node_id)
+ + " req_id=" + std::to_string(msg.get_req_id()) + " op=" +
std::to_string(int(msg.get_op())));
+
+ if (msg.get_op() == op) {
+ found++;
+ }
+ }
+ }
+
+ return found;
+ }
+
+ void check_messages_sent_to_correct_nodes(id_type key,
ignite::protocol::client_operation op) {
+ auto this_node_id = m_key_distribution.at(key);
+
+ // check if client connected to this node
+ bool connected_to_this_node = m_in_listeners.count(this_node_id);
+
+ int total = 0;
+ for (auto &[ep, node_id] : m_endpoint_to_node_id) {
+ auto cnt = count_op_by_node_id(node_id, op);
+
+ total += cnt;
+ if (connected_to_this_node) {
+ if (this_node_id == node_id) {
+ EXPECT_EQ(1, cnt) << "Key " << key << " was not found
among messages to correct node";
+ } else if (this_node_id != node_id) {
+ EXPECT_EQ(0, cnt) << "Key " << key << " was found among
messages to incorrect node";
+ }
+ }
+ }
+
+ if (!connected_to_this_node) {
+ EXPECT_EQ(1, total) << "Key " << key
+ << " was not found among messages to connected
nodes or message was duplicated";
+ }
+ }
+
+ /**
+ * Client which connected directly.
+ */
+ ignite_client m_direct_client;
+
+ std::map<std::string, uuid> m_endpoint_to_node_id;
+
+ std::map<id_type, uuid> m_key_distribution;
+
+ /**
+ * Client which connect through the proxy.
+ */
+ ignite_client m_client;
+
+ key_value_view<ignite_tuple, ignite_tuple> m_kv_view;
+ record_view<ignite_tuple> m_rec_view;
+
+ std::unique_ptr<proxy::asio_proxy> proxy;
+
+ std::map<uuid, std::shared_ptr<proxy::message_listener>> m_in_listeners;
+ std::map<uuid, std::shared_ptr<proxy::message_listener>> m_out_listeners;
+};
+
+struct SimpleType {
+ std::string table_name = "simple_table";
+ std::vector<std::string> key_column_names = {"ID"};
+ std::vector<std::string> collocation_column_names = {"ID"};
+ std::vector<std::string> non_key_column_names = {"TEXT"};
+ std::vector<std::string> all_column_names{};
+
+ SimpleType() {
+ all_column_names.reserve(key_column_names.size() +
non_key_column_names.size());
+ all_column_names.insert(all_column_names.end(),
key_column_names.begin(), key_column_names.end());
+ all_column_names.insert(all_column_names.end(),
non_key_column_names.begin(), non_key_column_names.end());
+ }
+
+ static std::string create_table_ddl() {
+ return "create table simple_table (id bigint primary key, text
varchar)";
+ }
+
+ static std::vector<primitive> get_pk_vals(id_type key) {
+ return {key};
+ }
+
+ /**
+ * Values of columns which is part of collocation key.
+ */
+ static std::vector<primitive> get_ck_values(id_type key) {
+ return {key};
+ }
+
+ static std::vector<primitive> get_main_vals(id_type key) {
+ return {std::to_string(key)};
+ }
+
+ static std::vector<primitive> get_alt_vals(id_type key) {
+ return {std::to_string(key + MAX_ID)};
+ }
+
+ static std::vector<primitive> get_main_rec(id_type key) {
+ std::vector<primitive> res;
+
+ auto key_part = get_pk_vals(key);
+ auto non_key_part = get_main_vals(key);
+
+ res.reserve(key_part.size() + non_key_part.size());
+
+ res.insert(res.end(), std::make_move_iterator(key_part.begin()),
std::make_move_iterator(key_part.end()));
+ res.insert(res.end(), std::make_move_iterator(non_key_part.begin()),
std::make_move_iterator(non_key_part.end()));
+
+ return res;
+ }
+
+ static std::vector<primitive> get_alt_rec(id_type key) {
+ std::vector<primitive> res;
+
+ auto key_part = get_pk_vals(key);
+ auto non_key_part = get_alt_vals(key);
+
+ res.reserve(key_part.size() + non_key_part.size());
+
+ res.insert(res.end(), std::make_move_iterator(key_part.begin()),
std::make_move_iterator(key_part.end()));
+ res.insert(res.end(), std::make_move_iterator(non_key_part.begin()),
std::make_move_iterator(non_key_part.end()));
+
+ return res;
+ }
+};
+
+struct TypeWithCollocationKey {
+ std::string table_name = "table_with_collocation_key";
+ std::vector<std::string> key_column_names = {"ID", "DEPT_ID"};
+ std::vector<std::string> collocation_column_names = {"DEPT_ID"};
+ std::vector<std::string> non_key_column_names = {"TEXT"};
+ std::vector<std::string> all_column_names{};
+
+ TypeWithCollocationKey() {
+ all_column_names.reserve(key_column_names.size() +
non_key_column_names.size());
+ all_column_names.insert(all_column_names.end(),
key_column_names.begin(), key_column_names.end());
+ all_column_names.insert(all_column_names.end(),
non_key_column_names.begin(), non_key_column_names.end());
+ }
+
+ static std::string create_table_ddl() {
+ return "create table table_with_collocation_key "
+ "(id bigint, dept_id bigint, text varchar, primary key (id,
dept_id)) "
+ "colocate by (dept_id)";
+ }
+
+ static std::vector<primitive> get_pk_vals(id_type key) {
+ return {key, -key};
+ }
+
+ /**
+ * Values of columns which is part of collocation key.
+ */
+ static std::vector<primitive> get_ck_values(id_type key) {
+ return {-key};
+ }
+
+ static std::vector<primitive> get_main_vals(id_type key) {
+ return {std::to_string(key)};
+ }
+
+ static std::vector<primitive> get_alt_vals(id_type key) {
+ return {std::to_string(key + MAX_ID)};
+ }
+
+ static std::vector<primitive> get_main_rec(id_type key) {
+ std::vector<primitive> res;
+
+ auto key_part = get_pk_vals(key);
+ auto non_key_part = get_main_vals(key);
+
+ res.reserve(key_part.size() + non_key_part.size());
+
+ res.insert(res.end(), std::make_move_iterator(key_part.begin()),
std::make_move_iterator(key_part.end()));
+ res.insert(res.end(), std::make_move_iterator(non_key_part.begin()),
std::make_move_iterator(non_key_part.end()));
+
+ return res;
+ }
+
+ static std::vector<primitive> get_alt_rec(id_type key) {
+ std::vector<primitive> res;
+
+ auto key_part = get_pk_vals(key);
+ auto non_key_part = get_alt_vals(key);
+
+ res.reserve(key_part.size() + non_key_part.size());
+
+ res.insert(res.end(), std::make_move_iterator(key_part.begin()),
std::make_move_iterator(key_part.end()));
+ res.insert(res.end(), std::make_move_iterator(non_key_part.begin()),
std::make_move_iterator(non_key_part.end()));
+
+ return res;
+ }
+};
+
+using TestTypes = ::testing::Types<SimpleType, TypeWithCollocationKey>;
+TYPED_TEST_SUITE(partition_awareness_test, TestTypes);
+
+TYPED_TEST(partition_awareness_test, kv_contains) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_kv_view.contains(nullptr, this->key_tup(key));
+
+ EXPECT_TRUE(val);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_CONTAINS_KEY);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_kv_view.get(nullptr, this->key_tup(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get_and_put) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_kv_view.get_and_put(nullptr, this->key_tup(key),
this->alt_val_tup(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET_AND_UPSERT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get_and_remove) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_kv_view.get_and_remove(nullptr, this->key_tup(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET_AND_DELETE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_get_and_replace) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_kv_view.get_and_replace(nullptr,
this->key_tup(key), this->alt_val_tup(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET_AND_REPLACE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_replace) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ bool success = this->m_kv_view.replace(nullptr, this->key_tup(key),
this->alt_val_tup(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_REPLACE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_replace_exact) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ bool success = this->m_kv_view.replace(nullptr, this->key_tup(key),
this->main_val_tup(key), this->alt_val_tup(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_REPLACE_EXACT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_remove) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ bool success = this->m_kv_view.remove(nullptr, this->key_tup(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_DELETE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_remove_exact) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ bool success = this->m_kv_view.remove(nullptr, this->key_tup(key),
this->main_val_tup(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_DELETE_EXACT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, kv_put) {
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ this->m_kv_view.put(nullptr, this->key_tup(key),
this->main_val_tup(key));
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_UPSERT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_rec_view.get(nullptr, this->key_tup(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get_and_upsert) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_rec_view.get_and_upsert(nullptr,
this->alt_rec(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET_AND_UPSERT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get_and_remove) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_rec_view.get_and_remove(nullptr,
this->alt_rec(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET_AND_DELETE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_get_and_replace) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto val = this->m_rec_view.get_and_replace(nullptr,
this->alt_rec(key));
+
+ EXPECT_TRUE(val.has_value());
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_GET_AND_REPLACE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_replace) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto success = this->m_rec_view.replace(nullptr, this->alt_rec(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_REPLACE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_replace_exact) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto success = this->m_rec_view.replace(nullptr, this->main_rec(key),
this->alt_rec(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_REPLACE_EXACT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_remove) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto success = this->m_rec_view.remove(nullptr, this->key_tup(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_DELETE);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_remove_exact) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto success = this->m_rec_view.remove_exact(nullptr,
this->main_rec(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_DELETE_EXACT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_upsert) {
+ this->populate_table();
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ this->m_rec_view.upsert(nullptr, this->main_rec(key));
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_UPSERT);
+ }
+}
+
+TYPED_TEST(partition_awareness_test, rec_insert) {
+
+ for (id_type key = MIN_ID; key < MAX_ID; ++key) {
+
+ auto success = this->m_rec_view.insert(nullptr, this->main_rec(key));
+
+ EXPECT_TRUE(success);
+
+ this->check_messages_sent_to_correct_nodes(key,
protocol::client_operation::TUPLE_INSERT);
+ }
+}
+
+} // namespace ignite
diff --git a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
index 402291e4304..ea4abb5be51 100644
--- a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
@@ -24,4 +24,4 @@ set(SOURCES
connection_test.cpp
)
-ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
+ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
diff --git a/modules/platforms/cpp/tests/fake_server/connection_test.cpp
b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
index 393d6f92338..8d37c2b20af 100644
--- a/modules/platforms/cpp/tests/fake_server/connection_test.cpp
+++ b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
@@ -17,7 +17,7 @@
#include "fake_server.h"
#include "ignite/client/ignite_client.h"
-#include "proxy/asio_proxy.h"
+#include <proxy/asio_proxy.h>
#include "tests/client-test/ignite_runner_suite.h"
#include <gtest/gtest.h>
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
deleted file mode 100644
index 92773b60e39..00000000000
--- a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
+++ /dev/null
@@ -1,38 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-#pragma once
-#include <queue>
-#include <utility>
-
-namespace ignite::proxy {
-
-using message = std::vector<char>;
-
-class message_listener {
-public:
- void register_message(message msg) {
- m_queue.push(std::move(msg));
- }
-
- const std::queue<message>& get_msg_queue() const {
- return m_queue;
- }
-
-private:
- std::queue<message> m_queue{};
-};
-} // namespace ignite::proxy
diff --git a/modules/platforms/cpp/tests/test-common/CMakeLists.txt
b/modules/platforms/cpp/tests/test-common/CMakeLists.txt
index 4ec7a914f6e..b13587dca33 100644
--- a/modules/platforms/cpp/tests/test-common/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/test-common/CMakeLists.txt
@@ -27,7 +27,7 @@ set(SOURCES
)
add_library(${TARGET} OBJECT ${SOURCES})
-target_link_libraries(${TARGET} ignite-common)
+target_link_libraries(${TARGET} ignite-common asio msgpack-c)
target_include_directories(${TARGET} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
set_target_properties(${TARGET} PROPERTIES VERSION ${CMAKE_PROJECT_VERSION})
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
b/modules/platforms/cpp/tests/test-common/proxy/asio_proxy.h
similarity index 93%
rename from modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
rename to modules/platforms/cpp/tests/test-common/proxy/asio_proxy.h
index 976f2faa8a8..e5b0dc61758 100644
--- a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
+++ b/modules/platforms/cpp/tests/test-common/proxy/asio_proxy.h
@@ -93,11 +93,11 @@ public:
, m_dst(std::move(dst))
, m_listener(std::move(listener))
, m_failed(failed)
- , m_logger(std::move(logger)) {}
+ , m_logger(std::move(logger)) { }
void do_read() {
m_src->async_read_some(asio::buffer(m_buf, BUFF_SIZE),
- [self = this->shared_from_this()](const asio::error_code& ec, size_t
len) {
+ [self = shared_from_this()](const asio::error_code& ec, size_t len) {
if (ec) {
if (ec == asio::error::eof) {
return;
@@ -107,7 +107,7 @@ public:
self->m_failed.store(true);
}
- message m{self->m_buf.begin(), self->m_buf.begin() + len};
+ raw_message m{self->m_buf.begin(), self->m_buf.begin() + len};
if (self->m_listener) {
self->m_listener->register_message(m);
@@ -117,7 +117,7 @@ public:
});
}
- void do_write(message&& msg) {
+ void do_write(raw_message&& msg) {
asio::async_write(
*m_dst, asio::buffer(msg.data(), msg.size()),
[self = shared_from_this()](asio::error_code ec, size_t) {
@@ -154,6 +154,7 @@ public:
std::shared_ptr<gtest_logger> logger)
: m_in_sock(std::move(in_sock))
, m_out_sock(std::move(out_sock))
+ , m_logger(logger)
{
m_forward_part = std::make_shared<session_part>(m_in_sock, m_out_sock,
in_listener, failed, logger);
m_reverse_part = std::make_shared<session_part>(m_out_sock, m_in_sock,
out_listener, failed, logger);
@@ -169,6 +170,8 @@ public:
);
}
+ self->m_logger->log_info("Proxy: connected to " +
e.address().to_string() + ":" + std::to_string(e.port()));
+
self->do_serve();
});
}
@@ -183,6 +186,8 @@ private:
std::shared_ptr<session_part> m_forward_part;
std::shared_ptr<session_part> m_reverse_part;
+
+ std::shared_ptr<gtest_logger> m_logger;
};
class asio_proxy {
@@ -233,6 +238,8 @@ private:
throw std::runtime_error("Error accepting incoming connection
" + ec.message());
}
+ m_logger->log_info("Proxy: accepted connection directed to " +
entry.m_out_host + ":" + entry.m_out_port);
+
auto p_in_sock = std::make_shared<tcp::socket>(std::move(in_sock));
auto p_out_sock = std::make_shared<tcp::socket>(m_io_context);
auto ses = std::make_shared<session>(
diff --git a/modules/platforms/cpp/tests/test-common/proxy/message_listener.h
b/modules/platforms/cpp/tests/test-common/proxy/message_listener.h
new file mode 100644
index 00000000000..f7007951a1d
--- /dev/null
+++ b/modules/platforms/cpp/tests/test-common/proxy/message_listener.h
@@ -0,0 +1,179 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/network/length_prefix_codec.h"
+#include "ignite/protocol/client_operation.h"
+#include "ignite/protocol/reader.h"
+
+#include <atomic>
+#include <cassert>
+#include <queue>
+#include <shared_mutex>
+#include <utility>
+
+namespace ignite::proxy {
+
+using raw_message = std::vector<char>;
+
+class structured_message {
+public:
+ structured_message() = default;
+
+ structured_message(const structured_message& structured_message) = default;
+
+ explicit structured_message(std::vector<std::byte> data)
+ :m_data(std::move(data)) { }
+
+ [[nodiscard]] protocol::reader payload_reader() const {
+ return protocol::reader{{m_data.data() + m_payload_pos, m_data.size()
- m_payload_pos}};
+ }
+protected:
+ std::vector<std::byte> m_data{};
+ size_t m_payload_pos{};
+};
+
+/**
+ * Message which clients sends to server.
+ */
+class client_message: public structured_message{
+public:
+ client_message() = default;
+
+ client_message(const client_message& msg) = default;
+
+ explicit client_message(std::vector<std::byte> data)
+ : structured_message(std::move(data))
+ {
+ protocol::reader rd{{m_data.data(), m_data.size()}};
+
+ m_op = static_cast<protocol::client_operation>(rd.read_int32());
+ m_req_id = rd.read_int64();
+
+ m_payload_pos = rd.position();
+
+ assert(m_payload_pos <= m_data.size());
+ }
+
+ [[nodiscard]] protocol::client_operation get_op() const { return m_op; }
+
+ [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+private:
+ protocol::client_operation m_op{};
+ int64_t m_req_id{};
+};
+
+/**
+ * Message which server sends to client.
+ */
+class server_message : public structured_message {
+public:
+ server_message() = default;
+
+ server_message(const server_message&) = default;
+
+ explicit server_message(std::vector<std::byte> data)
+ : structured_message(std::move(data))
+ {
+ protocol::reader rd{{m_data.data(), m_data.size()}};
+
+ m_req_id = rd.read_int64();
+ m_flags = rd.read_int32();
+ m_obs_ts = rd.read_int64();
+
+ m_payload_pos = rd.position();
+
+ assert(m_payload_pos <= m_data.size());
+ }
+
+ [[nodiscard]] int64_t get_req_id() const { return m_req_id; }
+
+ [[nodiscard]] int32_t get_flags() const { return m_flags; }
+
+ [[nodiscard]] int64_t get_obs_ts() const { return m_obs_ts; }
+
+private:
+ int64_t m_req_id{};
+ int32_t m_flags{};
+ int64_t m_obs_ts{};
+};
+
+/**
+ * Intercepts messages which go through @c asio_proxy.
+ */
+class message_listener {
+public:
+ void register_message(raw_message msg) {
+ if (enable_message_registration.load()) {
+ std::unique_lock lock(m_mutex);
+ m_queue.push(std::move(msg));
+ }
+ }
+
+ [[nodiscard]] std::queue<raw_message> get_msg_queue() const {
+ std::shared_lock lock(m_mutex);
+ return m_queue;
+ }
+
+ template<typename MESSAGE_TYPE>
+ std::vector<MESSAGE_TYPE> get_next() {
+ std::unique_lock lock(m_mutex);
+
+ std::vector<MESSAGE_TYPE> res;
+
+ while (!m_queue.empty() && res.empty()) {
+ auto& chunk = m_queue.front();
+
+ network::data_buffer_ref buf{{chunk.data(), chunk.size()}};
+
+ while (true) {
+ auto out = codec.decode(buf);
+
+ if (out.empty()) {
+ break;
+ }
+
+ auto out_bv = out.get_bytes_view();
+ std::vector<std::byte> data{out_bv.begin(), out_bv.end()};
+ res.emplace_back(std::move(data));
+ }
+ m_queue.pop();
+ }
+
+ return res;
+ }
+
+ /**
+ * Enable/disable message registration for message listeners;
+ * @param enable @c True if registration enabled, otherwise disabled.
+ */
+ void toggle_message_registration(bool enable) {
+ enable_message_registration.store(enable);
+ }
+
+private:
+ std::queue<raw_message> m_queue{};
+
+ mutable std::shared_mutex m_mutex;
+
+ network::length_prefix_codec codec;
+
+ std::atomic_bool enable_message_registration{true};
+};
+} // namespace ignite::proxy
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
index c7a33174435..886d3e125b3 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/runner/app/Jobs.java
@@ -30,8 +30,10 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -47,6 +49,8 @@ import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
import org.jetbrains.annotations.Nullable;
/** Jobs and marshallers definitions that are used in tests. */
@@ -630,4 +634,29 @@ public class Jobs {
throw new CustomException(TRACE_ID, COLUMN_NOT_FOUND_ERR, "Custom
job error", null);
}
}
+
+ /**
+ * Job returns actual partition distribution for particular table
calculated by java side.
+ */
+ public static class GetPartitionDistributionByTableCppJob implements
ComputeJob<String, String> {
+ @Override
+ public @Nullable CompletableFuture<String>
executeAsync(JobExecutionContext context, String tableName) {
+ PartitionDistribution pd =
context.ignite().tables().table(tableName).partitionDistribution();
+
+ Map<UUID, List<Long>> distribution = new HashMap<>();
+ for (Partition partition : pd.partitions()) {
+ var primaryNode = pd.primaryReplica(partition);
+
+ distribution.computeIfAbsent(primaryNode.id(), k -> new
ArrayList<>()).add(partition.id());
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ for (Entry<UUID, List<Long>> entry : distribution.entrySet()) {
+
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(";");
+ }
+
+ return completedFuture(sb.toString());
+ }
+ }
}