This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch ignite-27373
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 3c3b7e7d90fc63f15722dac659314878a5902324
Author: Igor Sapego <[email protected]>
AuthorDate: Thu Feb 26 22:25:01 2026 +0100

    IGNITE-27373 Add tests
---
 .../python/dbapi/pyignite_dbapi/__init__.py        |   2 +-
 modules/platforms/python/dbapi/tests/conftest.py   |   3 +-
 .../python/dbapi/tests/test_concurrency.py         | 192 +++++++++++++++++++++
 3 files changed, 195 insertions(+), 2 deletions(-)

diff --git a/modules/platforms/python/dbapi/pyignite_dbapi/__init__.py 
b/modules/platforms/python/dbapi/pyignite_dbapi/__init__.py
index 7d6b70afde4..55f291616b8 100644
--- a/modules/platforms/python/dbapi/pyignite_dbapi/__init__.py
+++ b/modules/platforms/python/dbapi/pyignite_dbapi/__init__.py
@@ -27,7 +27,7 @@ __version__ = pkgutil.get_data(__name__, 
"_version.txt").decode
 apilevel = '2.0'
 """PEP 249 is supported."""
 
-threadsafety = 1
+threadsafety = 2
 """Threads may share the module, but not connections."""
 
 paramstyle = 'qmark'
diff --git a/modules/platforms/python/dbapi/tests/conftest.py 
b/modules/platforms/python/dbapi/tests/conftest.py
index 16a4cc908cb..a565ac335ef 100644
--- a/modules/platforms/python/dbapi/tests/conftest.py
+++ b/modules/platforms/python/dbapi/tests/conftest.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import time
 
 import pyignite_dbapi
 import pytest
@@ -26,7 +27,7 @@ TEST_PAGE_SIZE = 32
 
 @pytest.fixture()
 def table_name(request):
-    return request.node.originalname
+    return f"{request.node.originalname}_{int(time.monotonic_ns())}"
 
 
 @pytest.fixture()
diff --git a/modules/platforms/python/dbapi/tests/test_concurrency.py 
b/modules/platforms/python/dbapi/tests/test_concurrency.py
new file mode 100644
index 00000000000..e78e281b580
--- /dev/null
+++ b/modules/platforms/python/dbapi/tests/test_concurrency.py
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import time
+from builtins import ExceptionGroup
+
+import pytest
+
+import pyignite_dbapi
+from tests.util import server_addresses_basic
+
+
+CONNECT_KWARGS = {"address": server_addresses_basic}
+NUM_THREADS = 50
+
+
[email protected]()
+def module_level_threadsafety():
+    assert pyignite_dbapi.threadsafety >= 1, "Module can not be used 
concurrently"
+
+
[email protected]()
+def connection_level_threadsafety(module_level_threadsafety):
+    assert pyignite_dbapi.threadsafety >= 2, "Connections can not be used 
concurrently"
+
+
[email protected]()
+def table(table_name, service_cursor, drop_table_cleanup):
+    service_cursor.execute(f"CREATE TABLE {table_name} (id int primary key, 
data varchar)")
+    yield table_name
+
+
+def run_threads(fn, n=NUM_THREADS, *args):
+    barrier = threading.Barrier(n)
+    errors = []
+
+    def wrapper(tid):
+        try:
+            barrier.wait()
+            fn(tid, *args)
+        except Exception as e:
+            errors.append(e)
+
+    threads = [threading.Thread(target=wrapper, args=(i,)) for i in range(n)]
+    for t in threads:
+        t.start()
+    for t in threads:
+        t.join()
+
+    if errors:
+        raise ExceptionGroup("thread errors", errors)
+
+
+def test_concurrent_module_import(module_level_threadsafety):
+    import importlib
+
+    def task(_):
+        m = importlib.import_module(pyignite_dbapi.__name__)
+        assert m.threadsafety > 0, "Module can not be used concurrently"
+
+    run_threads(task)
+
+
+def test_concurrent_connect_use_close(module_level_threadsafety):
+    def task(_):
+        c = pyignite_dbapi.connect(**CONNECT_KWARGS)
+        with c.cursor() as cur:
+            cur.execute("SELECT 1")
+            assert cur.fetchone() is not None
+        c.close()
+
+    run_threads(task)
+
+
+def test_shared_connection_per_thread_cursors(connection, 
connection_level_threadsafety):
+    def task(_):
+        with connection.cursor() as cur:
+            cur.execute("SELECT 1")
+            row = cur.fetchone()
+            assert row is not None
+
+    run_threads(task)
+
+
+def test_concurrent_inserts_no_lost_writes(table, connection, 
connection_level_threadsafety):
+    rows_per_thread = 50
+
+    def task(thread_id):
+        with connection.cursor() as cur:
+            for i in range(rows_per_thread):
+                cur.execute(f"INSERT INTO {table} (id, data) VALUES (?, ?)", 
(thread_id * rows_per_thread + i, f"v{thread_id}-{i}"))
+
+    run_threads(task)
+
+    with connection.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table}")
+        count = cur.fetchone()[0]
+    assert count == NUM_THREADS * rows_per_thread
+
+
+def test_concurrent_commit_and_rollback(table, connection, 
connection_level_threadsafety):
+    """Half the threads commit, half rollback. Only committed rows appear."""
+    committed_ids = []
+    lock = threading.Lock()
+
+    def task(thread_id):
+        connection.autocommit = False
+        with connection.cursor() as cur:
+            cur.execute(f"INSERT INTO {table} (id, data) VALUES (?, ?)", 
(thread_id, "x"))
+        if thread_id % 2 == 0:
+            connection.commit()
+            with lock:
+                committed_ids.append(thread_id)
+        else:
+            connection.rollback()
+
+    run_threads(task)
+
+    time.sleep(5.0)
+
+    with connection.cursor() as cur:
+        cur.execute(f"SELECT id FROM {table} ORDER BY id")
+        found_ids = {row[0] for row in cur.fetchall()}
+
+    assert found_ids == set(committed_ids)
+
+
+def test_concurrent_fetchall_result_integrity(table, connection, 
connection_level_threadsafety):
+    rows_num = 200
+    with connection.cursor() as cur:
+        cur.executemany(f"INSERT INTO {table} (id, data) VALUES (?, ?)", [(i, 
f"val-{i}") for i in range(rows_num)])
+
+    errors = []
+
+    def task(_):
+        with connection.cursor() as cur:
+            cur.execute(f"SELECT id, data FROM {table} ORDER BY id")
+            rows = cur.fetchall()
+        if len(rows) != rows_num:
+            errors.append(AssertionError(f"Expected {rows_num} rows, got 
{len(rows)}"))
+            return
+        for idx, (rid, val) in enumerate(rows):
+            if val != f"val-{rid}":
+                errors.append(AssertionError(f"Corrupted row: id={rid}, 
val={val!r}"))
+
+    run_threads(task)
+    if errors:
+        raise ExceptionGroup("result integrity errors", errors)
+
+
+def test_cursor_description_thread_safety(table, connection, 
connection_level_threadsafety):
+    expected_names = {"ID", "DATA"}
+
+    def task(_):
+        with connection.cursor() as cur:
+            cur.execute(f"SELECT id, data FROM {table} LIMIT 1")
+            desc = cur.description
+            assert desc is not None
+            col_names = {col[0] for col in desc}
+            assert col_names == expected_names, f"Unexpected columns: 
{col_names}"
+
+    run_threads(task)
+
+
+def test_concurrent_executemany(table, connection, 
connection_level_threadsafety):
+    rows_per_thread = 20
+
+    def task(thread_id):
+        rows = [(thread_id * 1000 + i, f"{thread_id}-{i}") for i in 
range(rows_per_thread)]
+        with connection.cursor() as cur:
+            cur.executemany(f"INSERT INTO {table} (id, data) VALUES (?, ?)", 
rows)
+
+    run_threads(task)
+
+    with connection.cursor() as cur:
+        cur.execute(f"SELECT COUNT(*) FROM {table}")
+        count = cur.fetchone()[0]
+
+    assert count == NUM_THREADS * rows_per_thread

Reply via email to