This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4eb7521fb36 IGNITE-26358 DB API Driver: Heartbeats (#7528)
4eb7521fb36 is described below

commit 4eb7521fb36c7b6908fc3c58ca6e8696b774b676
Author: Igor Sapego <[email protected]>
AuthorDate: Tue Feb 17 15:47:45 2026 +0100

    IGNITE-26358 DB API Driver: Heartbeats (#7528)
---
 modules/platforms/python/cpp_module/module.cpp     |  22 ++-
 .../platforms/python/cpp_module/node_connection.h  | 194 +++++++++++++--------
 .../platforms/python/cpp_module/py_connection.cpp  |  47 +++--
 .../platforms/python/cpp_module/py_connection.h    |   5 +-
 modules/platforms/python/cpp_module/statement.cpp  |  32 ++--
 modules/platforms/python/cpp_module/statement.h    |   6 +-
 modules/platforms/python/cpp_module/utils.cpp      |  31 +++-
 modules/platforms/python/cpp_module/utils.h        |   2 +-
 .../platforms/python/pyignite_dbapi/__init__.py    |   2 +
 modules/platforms/python/tests/conftest.py         |  21 ++-
 modules/platforms/python/tests/test_connect.py     |  42 +++++
 11 files changed, 276 insertions(+), 128 deletions(-)

diff --git a/modules/platforms/python/cpp_module/module.cpp 
b/modules/platforms/python/cpp_module/module.cpp
index fd8fb96959c..e4b143a89a3 100644
--- a/modules/platforms/python/cpp_module/module.cpp
+++ b/modules/platforms/python/cpp_module/module.cpp
@@ -58,8 +58,10 @@ PyObject* make_connection()
 }
 
 PyObject* make_connection(std::vector<ignite::end_point> addresses, const 
char* schema, const char* identity, const char* secret,
-    int page_size, int timeout, bool autocommit, ssl_config &&ssl_cfg) {
-    auto py_conn = make_py_connection(std::move(addresses), schema, identity, 
secret, page_size, timeout, autocommit, std::move(ssl_cfg));
+    int page_size, int timeout, double heartbeat_interval, bool autocommit, 
ssl_config &&ssl_cfg) {
+    auto py_conn = make_py_connection(std::move(addresses), schema, identity, 
secret, page_size, timeout, heartbeat_interval,
+        autocommit, std::move(ssl_cfg));
+
     if (!py_conn)
         return nullptr;
 
@@ -81,12 +83,13 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args, 
PyObject* kwargs) {
         const_cast<char*>("schema"),
         const_cast<char*>("timezone"),
         const_cast<char*>("timeout"),
+        const_cast<char*>("heartbeat_interval"),
         const_cast<char*>("page_size"),
         const_cast<char*>("autocommit"),
-        "use_ssl",
-        "ssl_keyfile",
-        "ssl_certfile",
-        "ssl_ca_certfile",
+        const_cast<char*>("use_ssl"),
+        const_cast<char*>("ssl_keyfile"),
+        const_cast<char*>("ssl_certfile"),
+        const_cast<char*>("ssl_ca_certfile"),
         nullptr
     };
 
@@ -96,6 +99,7 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args, 
PyObject* kwargs) {
     const char *schema = nullptr;
     const char *timezone = nullptr;
     int timeout = 0;
+    double heartbeat_interval = 30.0;
     int page_size = 0;
     int autocommit = 1;
     int use_ssl = 0;
@@ -103,8 +107,8 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args, 
PyObject* kwargs) {
     const char *ssl_certfile = nullptr;
     const char *ssl_ca_certfile = nullptr;
 
-    int parsed = PyArg_ParseTupleAndKeywords(args, kwargs, "O|$ssssiippsss", 
kwlist, &address, &identity, &secret,
-        &schema, &timezone, &timeout, &page_size, &autocommit, &use_ssl, 
&ssl_keyfile, &ssl_certfile, &ssl_ca_certfile);
+    int parsed = PyArg_ParseTupleAndKeywords(args, kwargs, "O|$ssssidippsss", 
kwlist, &address, &identity, &secret, &schema,
+        &timezone, &timeout, &heartbeat_interval, &page_size, &autocommit, 
&use_ssl, &ssl_keyfile, &ssl_certfile, &ssl_ca_certfile);
 
     if (!parsed)
         return nullptr;
@@ -157,7 +161,7 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args, 
PyObject* kwargs) {
 
     ssl_config ssl_cfg(use_ssl != 0, ssl_keyfile, ssl_certfile, 
ssl_ca_certfile);
 
-    return make_connection(std::move(addresses), schema, identity, secret, 
page_size, timeout, autocommit != 0, std::move(ssl_cfg));
+    return make_connection(std::move(addresses), schema, identity, secret, 
page_size, timeout, heartbeat_interval, autocommit != 0, std::move(ssl_cfg));
 }
 
 PyMethodDef methods[] = {
diff --git a/modules/platforms/python/cpp_module/node_connection.h 
b/modules/platforms/python/cpp_module/node_connection.h
index c372027b922..a97c0508f68 100644
--- a/modules/platforms/python/cpp_module/node_connection.h
+++ b/modules/platforms/python/cpp_module/node_connection.h
@@ -19,6 +19,7 @@
 
 #include "ignite/common/end_point.h"
 #include "ignite/common/detail/bytes.h"
+#include "ignite/common/detail/thread_timer.h"
 #include "ignite/common/detail/utils.h"
 #include "ignite/network/socket_client.h"
 #include "ignite/network/network.h"
@@ -32,20 +33,50 @@
 #include <cstdint>
 #include <cassert>
 #include <optional>
+#include <memory>
 #include <random>
 #include <string>
+#include <mutex>
 
 #include "ssl_config.h"
 #include "type_conversion.h"
+#include "ignite/protocol/heartbeat_timeout.h"
+
 
 
 /**
  * A single node connection.
  * TODO: https://issues.apache.org/jira/browse/IGNITE-25744 Move connection 
logic to the protocol library.
  */
-class node_connection final {
+class node_connection final : public 
std::enable_shared_from_this<node_connection> {
 public:
     static constexpr std::int32_t DEFAULT_TIMEOUT_SECONDS = 30;
+    static constexpr std::chrono::milliseconds DEFAULT_HEARTBEAT_INTERVAL = 
std::chrono::seconds(30);
+    static constexpr std::int32_t DEFAULT_PAGE_SIZE = 1024;
+    static constexpr bool DEFAULT_AUTO_COMMIT = true;
+    static constexpr std::string_view DEFAULT_SCHEMA = "PUBLIC";
+
+    struct auth_configuration final {
+        std::string m_identity{};
+        std::string m_secret{};
+    };
+
+    struct configuration final {
+        configuration(std::vector<ignite::end_point> addresses, bool 
autocommit, ssl_config ssl_config, std::chrono::milliseconds heartbeat_interval)
+            : m_addresses(std::move(addresses))
+            , m_auto_commit(autocommit)
+            , m_ssl_configuration(std::move(ssl_config))
+            , m_heartbeat_interval(heartbeat_interval) {}
+
+        std::vector<ignite::end_point> m_addresses;
+        std::string m_schema{DEFAULT_SCHEMA};
+        auth_configuration m_auth_configuration{};
+        std::int32_t m_page_size{DEFAULT_PAGE_SIZE};
+        std::int32_t m_timeout{DEFAULT_TIMEOUT_SECONDS};
+        std::chrono::milliseconds 
m_heartbeat_interval{DEFAULT_HEARTBEAT_INTERVAL};
+        bool m_auto_commit{DEFAULT_AUTO_COMMIT};
+        ssl_config m_ssl_configuration;
+    };
 
     /**
      * Destructor.
@@ -59,48 +90,37 @@ public:
      *
      * @return Schema.
      */
-    [[nodiscard]] const std::string &get_schema() const { return m_schema; }
+    [[nodiscard]] const std::string &get_schema() const { return 
m_configuration.m_schema; }
 
     /**
      * Get page size.
      *
      * @return Page size.
      */
-    [[nodiscard]] std::int32_t get_page_size() const { return m_page_size; }
+    [[nodiscard]] std::int32_t get_page_size() const { return 
m_configuration.m_page_size; }
 
     /**
      * Get timeout.
      *
      * @return Timeout.
      */
-    [[nodiscard]] std::int32_t get_timeout() const { return m_timeout; }
+    [[nodiscard]] std::int32_t get_timeout() const { return 
m_configuration.m_timeout; }
 
     /**
      * Constructor.
      *
-     * @param addresses Addresses.
-     * @param schema Schema. Can be empty.
-     * @param auth_identity Auth identity. Can be empty.
-     * @param auth_secret Auth secret. Can be empty.
-     * @param page_size Page size.
-     * @param timeout Timeout.
-     * @param auto_commit Auto commit flag.
-     * @param ssl_cfg SSL Configuration.
-     */
-    node_connection(std::vector<ignite::end_point> addresses, std::string 
schema, std::string auth_identity,
-            std::string auth_secret, std::int32_t page_size, std::int32_t 
timeout, bool auto_commit, ssl_config ssl_cfg)
-        : m_addresses(std::move(addresses))
-        , m_schema(schema.empty() ? "PUBLIC" : std::move(schema))
-        , m_auth_identity(std::move(auth_identity))
-        , m_auth_secret(std::move(auth_secret))
-        , m_page_size(page_size > 0 ? page_size : 1024)
-        , m_timeout(timeout > 0 ? timeout : DEFAULT_TIMEOUT_SECONDS)
-        , m_auto_commit(auto_commit)
-        , m_ssl_config(std::move(ssl_cfg))
+     * @param cfg Configuration.
+     */
+    node_connection(configuration cfg)
+        : m_configuration(std::move(cfg))
+        , m_auto_commit(m_configuration.m_auto_commit)
+        , m_timer_thread(ignite::detail::thread_timer::start([] (auto&&) { /* 
Ignore */ }))
     {
+        assert(!m_configuration.m_addresses.empty());
+
         std::random_device device;
         std::mt19937 generator(device());
-        std::uniform_int_distribution<std::uint32_t> distribution(0, 
m_addresses.size());
+        std::uniform_int_distribution<std::uint32_t> distribution(0, 
m_configuration.m_addresses.size() - 1);
         m_current_address_idx = distribution(generator);
     }
 
@@ -117,13 +137,6 @@ public:
         }
     }
 
-    /**
-     * Get autocommit flag.
-     *
-     * @return Autocommit flag.
-     */
-    bool is_autocommit() const noexcept { return m_auto_commit; }
-
     /**
      * Set autocommit flag.
      *
@@ -208,7 +221,7 @@ public:
      *
      * @return @c true if the auto commit is enabled.
      */
-    [[nodiscard]] bool is_auto_commit() const { return m_auto_commit; }
+    [[nodiscard]] bool is_auto_commit() const noexcept { return m_auto_commit; 
}
 
     /**
      * Get transaction ID.
@@ -229,8 +242,9 @@ public:
         auto req_id = generate_next_req_id();
         auto request = make_request(req_id, op, wr);
 
-        send_message(request, m_timeout);
-        return receive_message_nothrow(req_id, m_timeout);
+        std::lock_guard lock(m_socket_mutex);
+        send_message(request, m_configuration.m_timeout);
+        return receive_message_nothrow(req_id, m_configuration.m_timeout);
     }
 
 private:
@@ -256,6 +270,8 @@ private:
             sent += res;
         }
 
+        m_last_message_ts = std::chrono::steady_clock::now();
+
         assert(static_cast<std::size_t>(sent) == size);
     }
 
@@ -338,6 +354,7 @@ private:
         if (res.second) {
             throw std::move(*res.second);
         }
+
         return std::move(res.first);
     }
 
@@ -389,7 +406,7 @@ private:
      * @param op Operation.
      * @param func Function.
      */
-    std::vector<std::byte> make_request(std::int64_t id, 
ignite::protocol::client_operation op,
+    static std::vector<std::byte> make_request(std::int64_t id, 
ignite::protocol::client_operation op,
         const std::function<void(ignite::protocol::writer &)> &func) {
         std::vector<std::byte> req;
         ignite::protocol::buffer_adapter buffer(req);
@@ -417,8 +434,9 @@ private:
         auto req_id = generate_next_req_id();
         auto request = make_request(req_id, op, wr);
 
-        send_message(request, m_timeout);
-        return receive_message(req_id, m_timeout);
+        std::lock_guard lock(m_socket_mutex);
+        send_message(request, m_configuration.m_timeout);
+        return receive_message(req_id, m_configuration.m_timeout);
     }
 
     /**
@@ -445,7 +463,7 @@ private:
      */
     void try_restore_connection() {
         if (!m_socket) {
-            if (m_ssl_config.m_enabled) {
+            if (m_configuration.m_ssl_configuration.m_enabled) {
                 try
                 {
                     ignite::network::ensure_ssl_loaded();
@@ -461,13 +479,13 @@ private:
                     }
 
                     throw 
ignite::ignite_error(ignite::error::code::CLIENT_SSL_CONFIGURATION,
-                        "Can not load OpenSSL library. [" + openssl_home_str + 
"]");
+                        "Can not load OpenSSL library. [path=" + 
openssl_home_str + ", error=" + err.what_str() + "]");
                 }
 
                 ignite::network::secure_configuration cfg;
-                cfg.key_path = m_ssl_config.m_ssl_keyfile;
-                cfg.cert_path = m_ssl_config.m_ssl_certfile;
-                cfg.ca_path = m_ssl_config.m_ssl_ca_certfile;
+                cfg.key_path = 
m_configuration.m_ssl_configuration.m_ssl_keyfile;
+                cfg.cert_path = 
m_configuration.m_ssl_configuration.m_ssl_certfile;
+                cfg.ca_path = 
m_configuration.m_ssl_configuration.m_ssl_ca_certfile;
 
                 m_socket = 
ignite::network::make_secure_socket_client(std::move(cfg));
             } else {
@@ -477,12 +495,12 @@ private:
 
         std::stringstream msgs;
         bool connected = false;
-        for (std::int32_t i = 0; i < m_addresses.size(); ++i) {
-            uint32_t idx = (m_current_address_idx + i) % m_addresses.size();
-            const ignite::end_point &address = m_addresses[idx];
+        for (std::int32_t i = 0; i < m_configuration.m_addresses.size(); ++i) {
+            uint32_t idx = (m_current_address_idx + i) % 
m_configuration.m_addresses.size();
+            const ignite::end_point &address = 
m_configuration.m_addresses[idx];
 
             try {
-                bool success = m_socket->connect(address.host.c_str(), 
address.port, m_timeout);
+                bool success = m_socket->connect(address.host.c_str(), 
address.port, m_configuration.m_timeout);
                 if (!success) {
                     continue;
                 }
@@ -514,20 +532,22 @@ private:
         static constexpr std::int8_t CLIENT_CODE = 4;
         m_protocol_version = ignite::protocol::protocol_version::get_current();
 
+        std::lock_guard lock(m_socket_mutex);
+
         std::map<std::string, std::string> extensions;
-        if (!m_auth_identity.empty()) {
+        if (!m_configuration.m_auth_configuration.m_identity.empty()) {
             static const std::string AUTH_TYPE{"basic"};
 
             extensions.emplace("authn-type", AUTH_TYPE);
-            extensions.emplace("authn-identity", m_auth_identity);
-            extensions.emplace("authn-secret", m_auth_secret);
+            extensions.emplace("authn-identity", 
m_configuration.m_auth_configuration.m_identity);
+            extensions.emplace("authn-secret", 
m_configuration.m_auth_configuration.m_secret);
         }
 
         std::vector<std::byte> message = make_handshake_request(CLIENT_CODE, 
m_protocol_version, extensions);
 
-        send_all(message.data(), message.size(), m_timeout);
-        receive_and_check_magic(message, m_timeout);
-        receive_message(message, m_timeout);
+        send_all(message.data(), message.size(), m_configuration.m_timeout);
+        receive_and_check_magic(message, m_configuration.m_timeout);
+        receive_message(message, m_configuration.m_timeout);
 
         auto response = ignite::protocol::parse_handshake_response(message);
         auto const &ver = response.context.get_version();
@@ -540,6 +560,13 @@ private:
         if (response.error) {
             throw ignite::ignite_error(ignite::error::code::HANDSHAKE_HEADER, 
"Server rejected handshake with error: " + response.error->what_str());
         }
+
+        m_heartbeat_interval = ignite::calculate_heartbeat_interval(
+            m_configuration.m_heartbeat_interval, 
std::chrono::milliseconds(response.idle_timeout_ms));
+
+        if (m_heartbeat_interval.count()) {
+            plan_heartbeat(m_heartbeat_interval);
+        }
     }
 
     /**
@@ -603,36 +630,50 @@ private:
     void on_observable_timestamp(std::int64_t timestamp) {
         auto expected = m_observable_timestamp.load();
         while (expected < timestamp) {
-            auto success = 
m_observable_timestamp.compare_exchange_weak(expected, timestamp);
-            if (success)
+            if (m_observable_timestamp.compare_exchange_weak(expected, 
timestamp))
                 return;
             expected = m_observable_timestamp.load();
         }
     }
 
-    /** Addresses. */
-    const std::vector<ignite::end_point> m_addresses;
+    void send_heartbeat() {
+        auto [data, err] = 
sync_request_nothrow(ignite::protocol::client_operation::HEARTBEAT, 
[](auto&){});
+        if (!err) {
+            plan_heartbeat(m_heartbeat_interval);
+        }
 
-    /** Schema. */
-    const std::string m_schema;
+        // There is no useful payload for us in the heartbeat response.
+        UNUSED_VALUE(data);
+    }
 
-    /** Identity. */
-    const std::string m_auth_identity;
+    void on_heartbeat_timeout() {
+        auto idle_for = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - m_last_message_ts);
 
-    /** Secret. */
-    const std::string m_auth_secret;
+        if (idle_for > m_heartbeat_interval) {
+            send_heartbeat();
+        } else {
+            auto sleep_for = m_heartbeat_interval - idle_for;
+            plan_heartbeat(sleep_for);
+        }
+    }
 
-    /** Page size. */
-    const std::int32_t m_page_size;
+    void plan_heartbeat(std::chrono::milliseconds timeout) {
+        m_timer_thread->add(timeout, [self_weak = weak_from_this()] {
+            if (auto self = self_weak.lock()) {
+                self->on_heartbeat_timeout();
+            }
+        });
+    }
 
-    /** Current address index. */
-    std::uint32_t m_current_address_idx{0};
+    /** Configuration. */
+    const configuration m_configuration;
 
-    /** Operation timeout in seconds. */
-    std::int32_t m_timeout{DEFAULT_TIMEOUT_SECONDS};
+    /** Auto-commit. */
+    bool m_auto_commit;
 
-    /** Autocommit flag. */
-    bool m_auto_commit{true};
+    /** Current address index. */
+    std::uint32_t m_current_address_idx{0};
 
     /** Current transaction ID. */
     std::optional<std::int64_t> m_transaction_id;
@@ -652,6 +693,15 @@ private:
     /** Observable timestamp. */
     std::atomic_int64_t m_observable_timestamp{0};
 
-    /** SSL Configuration. */
-    const ssl_config m_ssl_config;
+    /** Heartbeat interval. */
+    std::chrono::milliseconds m_heartbeat_interval{0};
+
+    /** Last message timestamp. */
+    std::chrono::steady_clock::time_point m_last_message_ts{};
+
+    /** Timer thread. */
+    std::shared_ptr<ignite::detail::thread_timer> m_timer_thread;
+
+    /** Socket mutex. */
+    std::recursive_mutex m_socket_mutex;
 };
diff --git a/modules/platforms/python/cpp_module/py_connection.cpp 
b/modules/platforms/python/cpp_module/py_connection.cpp
index 68e468d760d..3c1ff9abb55 100644
--- a/modules/platforms/python/cpp_module/py_connection.cpp
+++ b/modules/platforms/python/cpp_module/py_connection.cpp
@@ -34,7 +34,7 @@ namespace {
 struct py_connection {
     PyObject_HEAD
 
-    node_connection *m_connection;
+    std::shared_ptr<node_connection> *m_connection;
 };
 
 /**
@@ -54,7 +54,7 @@ bool py_connection_expect_open(const py_connection* self) {
 PyObject* py_connection_close(py_connection* self, PyObject*)
 {
     if (self->m_connection) {
-        self->m_connection->close();
+        (*self->m_connection)->close();
 
         delete self->m_connection;
         self->m_connection = nullptr;
@@ -85,7 +85,7 @@ PyObject* py_connection_autocommit(py_connection* self, 
PyObject*)
     if (!py_connection_expect_open(self))
         return nullptr;
 
-    if (self->m_connection->is_autocommit()) {
+    if ((*self->m_connection)->is_auto_commit()) {
         Py_RETURN_TRUE;
     }
     Py_RETURN_FALSE;
@@ -101,7 +101,7 @@ PyObject* py_connection_set_autocommit(py_connection* self, 
PyObject* value)
         return nullptr;
     }
 
-    self->m_connection->set_autocommit(value == Py_True);
+    (*self->m_connection)->set_autocommit(value == Py_True);
 
     Py_RETURN_NONE;
 }
@@ -112,7 +112,7 @@ PyObject* py_connection_commit(py_connection* self, 
PyObject*)
         return nullptr;
 
     try {
-        self->m_connection->transaction_commit();
+        (*self->m_connection)->transaction_commit();
     } catch (const ignite::ignite_error& err) {
         set_error(err);
         return nullptr;
@@ -126,7 +126,7 @@ PyObject* py_connection_rollback(py_connection* self, 
PyObject*)
         return nullptr;
 
     try {
-        self->m_connection->transaction_rollback();
+        (*self->m_connection)->transaction_rollback();
     } catch (const ignite::ignite_error& err) {
         set_error(err);
         return nullptr;
@@ -194,21 +194,34 @@ int register_py_connection_type(PyObject* mod) {
 }
 
 PyObject *make_py_connection(std::vector<ignite::end_point> addresses, const 
char* schema, const char* identity,
-    const char* secret, int page_size, int timeout, bool autocommit, 
ssl_config &&ssl_cfg) {
+    const char* secret, int page_size, int timeout, double heartbeat_interval, 
bool autocommit, ssl_config &&ssl_cfg) {
     if (addresses.empty()) {
         PyErr_SetString(py_get_module_interface_error_class(), "No addresses 
provided to connect");
         return nullptr;
     }
 
-    auto node_connection = std::make_unique<class node_connection>(
-        addresses,
-        schema ? schema : "",
-        identity ? identity : "",
-        secret ? secret : "",
-        page_size ? page_size : 1024,
-        timeout,
-        autocommit,
-        std::move(ssl_cfg));
+    if (heartbeat_interval < 0.0)
+        heartbeat_interval = 0.0;
+
+    std::chrono::milliseconds heartbeat_interval_chrono = 
std::chrono::milliseconds(static_cast<int>(std::ceil(heartbeat_interval * 
1000)));
+    node_connection::configuration cfg{addresses, autocommit, ssl_cfg, 
heartbeat_interval_chrono};
+
+    if (schema)
+        cfg.m_schema = schema;
+
+    if (identity)
+        cfg.m_auth_configuration.m_identity = identity;
+
+    if (secret)
+        cfg.m_auth_configuration.m_secret = secret;
+
+    if (page_size)
+        cfg.m_page_size = page_size;
+
+    if (timeout)
+        cfg.m_timeout = timeout;
+
+    auto node_connection = std::make_shared<class node_connection>(cfg);
 
     try {
         node_connection->establish();
@@ -221,7 +234,7 @@ PyObject *make_py_connection(std::vector<ignite::end_point> 
addresses, const cha
     if (!py_conn_obj)
         return nullptr;
 
-    py_conn_obj->m_connection = node_connection.release();
+    py_conn_obj->m_connection = new std::shared_ptr<class 
node_connection>(std::move(node_connection));
 
     return reinterpret_cast<PyObject *>(py_conn_obj);
 }
diff --git a/modules/platforms/python/cpp_module/py_connection.h 
b/modules/platforms/python/cpp_module/py_connection.h
index bcd885d1231..a1522c35cea 100644
--- a/modules/platforms/python/cpp_module/py_connection.h
+++ b/modules/platforms/python/cpp_module/py_connection.h
@@ -34,13 +34,14 @@
  * @param identity Identity.
  * @param secret Secret.
  * @param page_size Page size.
- * @param timeout Timeout.
+ * @param timeout Timeout in seconds.
+ * @param heartbeat_interval Heartbeat interval in seconds.
  * @param autocommit Autocommit flag.
  * @param ssl_cfg SSL Config.
  * @return A new connection class instance.
  */
 PyObject* make_py_connection(std::vector<ignite::end_point> addresses, const 
char* schema, const char* identity,
-    const char* secret, int page_size, int timeout, bool autocommit, 
ssl_config &&ssl_cfg);
+    const char* secret, int page_size, int timeout, double heartbeat_interval, 
bool autocommit, ssl_config &&ssl_cfg);
 
 /**
  * Prepare PyConnection type for registration.
diff --git a/modules/platforms/python/cpp_module/statement.cpp 
b/modules/platforms/python/cpp_module/statement.cpp
index 00840e1e71e..6076ee0ba68 100644
--- a/modules/platforms/python/cpp_module/statement.cpp
+++ b/modules/platforms/python/cpp_module/statement.cpp
@@ -132,7 +132,7 @@ void statement::close() noexcept {
         if (!m_query_id)
             return;
 
-        auto res = 
m_connection.sync_request_nothrow(ignite::protocol::client_operation::SQL_CURSOR_CLOSE,
+        auto res = 
m_connection->sync_request_nothrow(ignite::protocol::client_operation::SQL_CURSOR_CLOSE,
             [&](ignite::protocol::writer &writer) { writer.write(*m_query_id); 
});
 
         UNUSED_VALUE res;
@@ -156,16 +156,16 @@ void statement::execute(const char *query, 
py_parameter_set &params) {
     close();
 
     m_query = query;
-    auto &schema = m_connection.get_schema();
+    auto &schema = m_connection->get_schema();
 
     bool single = !params.is_batch_query();
 
-    auto tx = m_connection.get_transaction_id();
-    if (!tx && !m_connection.is_auto_commit()) {
+    auto tx = m_connection->get_transaction_id();
+    if (!tx && !m_connection->is_auto_commit()) {
         // Starting transaction if it's not started already.
-        m_connection.transaction_start();
+        m_connection->transaction_start();
 
-        tx = m_connection.get_transaction_id();
+        tx = m_connection->get_transaction_id();
         assert(tx);
     }
 
@@ -173,15 +173,15 @@ void statement::execute(const char *query, 
py_parameter_set &params) {
         ? ignite::protocol::client_operation::SQL_EXEC
         : ignite::protocol::client_operation::SQL_EXEC_BATCH;
 
-    auto [resp, err] = m_connection.sync_request_nothrow(client_op, 
[&](ignite::protocol::writer &writer) {
+    auto [resp, err] = m_connection->sync_request_nothrow(client_op, 
[&](ignite::protocol::writer &writer) {
         if (tx)
             writer.write(*tx);
         else
             writer.write_nil();
 
         writer.write(schema);
-        writer.write(m_connection.get_page_size());
-        writer.write(std::int64_t(m_connection.get_timeout()) * 1000);
+        writer.write(m_connection->get_page_size());
+        writer.write(std::int64_t(m_connection->get_timeout()) * 1000);
         writer.write_nil(); // Session timeout (unused, session is closed by 
the server immediately).
         writer.write_nil(); // Timezone
 
@@ -195,7 +195,7 @@ void statement::execute(const char *query, py_parameter_set 
&params) {
 
         writer.write(m_query);
         params.write(writer);
-        writer.write(m_connection.get_observable_timestamp());
+        writer.write(m_connection->get_observable_timestamp());
     });
 
     // Check error
@@ -211,7 +211,7 @@ void statement::execute(const char *query, py_parameter_set 
&params) {
         throw std::move(*err);
     }
 
-    m_connection.mark_transaction_non_empty();
+    m_connection->mark_transaction_non_empty();
 
     auto &response = resp;
     ignite::protocol::reader reader(response.get_bytes_view());
@@ -279,7 +279,7 @@ bool statement::fetch_next_row() {
                 ignite::error::code::CURSOR_ALREADY_CLOSED, "Cursor already 
closed.");
         }
 
-        auto [response, err] = m_connection.sync_request_nothrow(
+        auto [response, err] = m_connection->sync_request_nothrow(
             ignite::protocol::client_operation::SQL_CURSOR_NEXT_PAGE,
             [&](ignite::protocol::writer &writer) { writer.write(*m_query_id); 
});
 
@@ -301,10 +301,10 @@ bool statement::fetch_next_row() {
 }
 
 void statement::update_meta() {
-    auto &schema = m_connection.get_schema();
+    auto &schema = m_connection->get_schema();
 
-    auto tx = m_connection.get_transaction_id();
-    auto [response, err] = 
m_connection.sync_request_nothrow(ignite::protocol::client_operation::SQL_QUERY_META,
+    auto tx = m_connection->get_transaction_id();
+    auto [response, err] = 
m_connection->sync_request_nothrow(ignite::protocol::client_operation::SQL_QUERY_META,
         [&](ignite::protocol::writer &writer) {
             if (tx)
                 writer.write(*tx);
@@ -316,7 +316,7 @@ void statement::update_meta() {
         });
 
     if (tx) {
-        m_connection.mark_transaction_non_empty();
+        m_connection->mark_transaction_non_empty();
     }
 
     if (err) {
diff --git a/modules/platforms/python/cpp_module/statement.h 
b/modules/platforms/python/cpp_module/statement.h
index 9634fa36309..87fc00ba089 100644
--- a/modules/platforms/python/cpp_module/statement.h
+++ b/modules/platforms/python/cpp_module/statement.h
@@ -111,8 +111,8 @@ public:
      *
      * @param connection Connection.
      */
-    explicit statement(node_connection &connection)
-        : m_connection(connection) { }
+    explicit statement(std::shared_ptr<node_connection> connection)
+        : m_connection(std::move(connection)) { }
 
     /**
      * Destructor.
@@ -215,7 +215,7 @@ private:
     void set_params_meta(std::vector<sql_parameter> value);
 
     /** Connection associated with the statement. */
-    node_connection &m_connection;
+    std::shared_ptr<node_connection> m_connection;
 
     /** SQL query. */
     std::string m_query;
diff --git a/modules/platforms/python/cpp_module/utils.cpp 
b/modules/platforms/python/cpp_module/utils.cpp
index 7b7f08c8dac..a9248ab8fc1 100644
--- a/modules/platforms/python/cpp_module/utils.cpp
+++ b/modules/platforms/python/cpp_module/utils.cpp
@@ -156,7 +156,7 @@ void set_error(const ignite::ignite_error &error) {
         }
 
         case ignite::error::code::UNRESOLVABLE_CONSISTENT_ID:
-        case ignite::error::code::PORT_IN_USE:
+        case ignite::error::code::BIND:
         case ignite::error::code::FILE_TRANSFER:
         case ignite::error::code::FILE_VALIDATION:
         case ignite::error::code::RECIPIENT_LEFT:
@@ -210,6 +210,7 @@ void set_error(const ignite::ignite_error &error) {
         case ignite::error::code::NODE_NOT_FOUND:
         case ignite::error::code::MARSHALLING_TYPE_MISMATCH:
         case ignite::error::code::COMPUTE_JOB_CANCELLED:
+        case ignite::error::code::RESOURCE_NOT_FOUND:
         case ignite::error::code::COMPUTE_PLATFORM_EXECUTOR: {
             error_class = py_get_module_database_error_class();
             break;
@@ -259,9 +260,31 @@ void set_error(const ignite::ignite_error &error) {
             error_class = py_get_module_database_error_class();
             break;
         }
-    }
 
-    PyErr_SetString(error_class, error.what());
+        case ignite::error::code::UNSUPPORTED_TABLE_BASED_REPLICATION:
+        case ignite::error::code::OPERATION_TIMEOUT:
+        case ignite::error::code::TX_DELAYED_ACK:
+        case ignite::error::code::GROUP_OVERLOADED:
+        case ignite::error::code::GROUP_UNAVAILABLE:
+        case ignite::error::code::EMPTY_DATA_NODES:
+        case ignite::error::code::JOIN_DENIED:
+        case ignite::error::code::EMPTY_ASSIGNMENTS:
+        case ignite::error::code::NOT_ENOUGH_ALIVE_NODES:
+        case ignite::error::code::ILLEGAL_NODES_SET:
+        case ignite::error::code::REQUEST_FORWARD:
+        case ignite::error::code::REMOTE_NODE:
+        case ignite::error::code::CONFIGURATION_APPLY:
+        case ignite::error::code::CONFIGURATION_PARSE:
+        case ignite::error::code::CONFIGURATION_VALIDATION: {
+            error_class = py_get_module_not_supported_error_class();
+            break;
+        }
+    }
+    std::string error_str{error.what_str()};
+    if (error.get_java_stack_trace()) {
+        error_str += "\n" + *error.get_java_stack_trace();
+    }
+    PyErr_SetString(error_class, error_str.c_str());
 }
 
 std::string get_current_exception_as_string() {
@@ -278,7 +301,7 @@ std::string get_current_exception_as_string() {
     return {data, std::size_t(len)};
 }
 
-const char* py_object_get_typename(PyObject* obj) {
+const char* py_object_get_typename(const PyObject* obj) {
     if (!obj || !obj->ob_type || !obj->ob_type->tp_name) {
         return "Unknown";
     }
diff --git a/modules/platforms/python/cpp_module/utils.h 
b/modules/platforms/python/cpp_module/utils.h
index 56320c89af3..76a00f08a1c 100644
--- a/modules/platforms/python/cpp_module/utils.h
+++ b/modules/platforms/python/cpp_module/utils.h
@@ -61,7 +61,7 @@ std::string get_current_exception_as_string();
  * @param obj Object.
  * @return Typename if available, and "Unknown" otherwise.
  */
-const char* py_object_get_typename(PyObject* obj);
+const char* py_object_get_typename(const PyObject* obj);
 
 /**
  * Get the module instance.
diff --git a/modules/platforms/python/pyignite_dbapi/__init__.py 
b/modules/platforms/python/pyignite_dbapi/__init__.py
index 0d7894db738..7d6b70afde4 100644
--- a/modules/platforms/python/pyignite_dbapi/__init__.py
+++ b/modules/platforms/python/pyignite_dbapi/__init__.py
@@ -693,6 +693,8 @@ def connect(address: [str], **kwargs) -> Connection:
         Maximum number of rows that can be received or sent in a single 
request. Default value: 1024.
     timeout: int, optional
         A timeout for network operations, in seconds. Default value: 30.
+    heartbeat_interval: float, optional
+        An interval between heartbeat probes, in seconds. Zero or negative 
means heartbeats disabled. Default value: 30.
     autocommit: bool, optional
         Connection autocommit mode. Default value: True (enabled).
     use_ssl: bool, optional
diff --git a/modules/platforms/python/tests/conftest.py 
b/modules/platforms/python/tests/conftest.py
index 20b9dc94288..16a4cc908cb 100644
--- a/modules/platforms/python/tests/conftest.py
+++ b/modules/platforms/python/tests/conftest.py
@@ -31,7 +31,13 @@ def table_name(request):
 
 @pytest.fixture()
 def connection():
-    conn = pyignite_dbapi.connect(address=server_addresses_basic, 
page_size=TEST_PAGE_SIZE)
+    conn = pyignite_dbapi.connect(address=server_addresses_basic, 
page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
+    yield conn
+    conn.close()
+
[email protected]()
+def service_connection():
+    conn = pyignite_dbapi.connect(address=server_addresses_basic, 
page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
     yield conn
     conn.close()
 
@@ -44,10 +50,17 @@ def cursor(connection):
 
 
 @pytest.fixture()
-def drop_table_cleanup(cursor, table_name):
+def service_cursor(service_connection):
+    cursor = service_connection.cursor()
+    yield cursor
+    cursor.close()
+
+
[email protected]()
+def drop_table_cleanup(service_cursor, table_name):
+    service_cursor.execute(f'drop table if exists {table_name}')
     yield None
-    cursor.connection.setautocommit(True)
-    cursor.execute(f'drop table if exists {table_name}')
+    service_cursor.execute(f'drop table if exists {table_name}')
 
 
 @pytest.fixture(autouse=True, scope="session")
diff --git a/modules/platforms/python/tests/test_connect.py 
b/modules/platforms/python/tests/test_connect.py
index 916fff72563..c6853ae2739 100644
--- a/modules/platforms/python/tests/test_connect.py
+++ b/modules/platforms/python/tests/test_connect.py
@@ -12,6 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import time
+
 import pytest
 
 import pyignite_dbapi
@@ -57,3 +59,43 @@ def test_connection_wrong_arg(address, err_msg):
     with pytest.raises(pyignite_dbapi.InterfaceError) as err:
         pyignite_dbapi.connect(address=address, timeout=1)
     assert err.match(err_msg)
+
+
[email protected]("interval", [2.0, 20.0, 0.0001])
+def test_heartbeat_enabled(table_name, drop_table_cleanup, interval):
+    row_count = 10
+    with pyignite_dbapi.connect(address=server_addresses_basic[0], 
heartbeat_interval=interval) as conn:
+        with conn.cursor() as cursor:
+            cursor.execute(f"create table {table_name}(id int primary key, 
data varchar)")
+            for key in range(row_count):
+                cursor.execute(f"insert into {table_name} values({key}, 
'data-{key*2}')")
+                assert cursor.rowcount == 1
+
+            data_out = {}
+            for key in range(row_count):
+                cursor.execute(f"select id, data from {table_name} WHERE id = 
?", [key])
+                data_out[key] = cursor.fetchone()
+                if len(data_out) == 5:
+                    time.sleep(7)
+
+            assert len(data_out) == row_count
+
+
+def test_heartbeat_disabled(table_name, drop_table_cleanup):
+    row_count = 10
+    with pyignite_dbapi.connect(address=server_addresses_basic[0], 
heartbeat_interval=0) as conn:
+        with conn.cursor() as cursor:
+            cursor.execute(f"create table {table_name}(id int primary key, 
data varchar)")
+            for key in range(row_count):
+                cursor.execute(f"insert into {table_name} values({key}, 
'data-{key*2}')")
+                assert cursor.rowcount == 1
+
+            data_out = {}
+            with pytest.raises(pyignite_dbapi.OperationalError) as err:
+                for key in range(row_count):
+                    cursor.execute(f"select id, data from {table_name} where 
id = ?", [key])
+                    data_out[key] = cursor.fetchone()
+                    if len(data_out) == 5:
+                        time.sleep(7)
+
+            assert err.match("(Connection closed by the server|Can not send a 
message to the server due to connection error)")


Reply via email to