Copilot commented on code in PR #1276:
URL: 
https://github.com/apache/cassandra-python-driver/pull/1276#discussion_r2919888725


##########
tests/unit/io/test_asyncioreactor.py:
##########
@@ -1,19 +1,21 @@
 AsyncioConnection, ASYNCIO_AVAILABLE = None, False
 try:
     from cassandra.io.asyncioreactor import AsyncioConnection
-    import asynctest
     ASYNCIO_AVAILABLE = True
 except (ImportError, SyntaxError):
     AsyncioConnection = None
     ASYNCIO_AVAILABLE = False
 
 from tests import is_monkey_patched, connection_class
-from tests.unit.io.utils import TimerCallback, TimerTestMixin
+from tests.unit.io.utils import TimerCallback, TimerTestMixin, 
submit_and_wait_for_completion
 
-from unittest.mock import patch
+from unittest.mock import patch, MagicMock, Mock, AsyncMock

Review Comment:
   Unused imports: `submit_and_wait_for_completion`, `Mock`, and `AsyncMock` 
are imported but never referenced in this test module. Please remove them to 
keep the test dependencies minimal and avoid confusion about intended 
synchronization helpers.



##########
tests/unit/io/test_asyncioreactor.py:
##########
@@ -65,6 +67,31 @@ def reset_selector():
 
         super(AsyncioTimerTests, self).setUp()
 
+    def test_multi_timer_validation(self):
+        """
+        Override with a wider tolerance for asyncio's thread-based scheduling,
+        which has inherently more jitter than libev's native event loop.
+        """
+        from tests.unit.io.utils import get_timeout
+        pending_callbacks = []
+        completed_callbacks = []
+
+        for gross_time in range(0, 100, 1):
+            timeout = get_timeout(gross_time, 0, 100, 100, False)
+            callback = TimerCallback(timeout)
+            self.create_timer(timeout, callback.invoke)
+            pending_callbacks.append(callback)
+
+        while len(pending_callbacks) != 0:
+            for callback in pending_callbacks:
+                if callback.was_invoked():
+                    pending_callbacks.remove(callback)
+                    completed_callbacks.append(callback)
+            time.sleep(.1)

Review Comment:
   `test_multi_timer_validation` mutates `pending_callbacks` while iterating 
over it (`pending_callbacks.remove(callback)` inside the `for callback in 
pending_callbacks` loop). This can skip elements and, combined with the 
unbounded `while` loop (no overall timeout), can lead to a hang if callbacks 
are missed. Consider iterating over a copy (or using a separate list of 
completed callbacks) and adding a max wait/timeout to fail fast instead of 
looping indefinitely.



##########
tests/unit/io/test_asyncioreactor.py:
##########
@@ -75,3 +102,291 @@ def test_timer_cancellation(self):
         time.sleep(.2)
         # Assert that the cancellation was honored
         self.assertFalse(callback.was_invoked())
+
+
[email protected](is_monkey_patched(), 'runtime is monkey patched for another 
reactor')
[email protected](connection_class is not AsyncioConnection,
+                 'not running asyncio tests; current connection_class is 
{}'.format(connection_class))
[email protected](ASYNCIO_AVAILABLE, "asyncio is not available for this 
runtime")
+class AsyncioConnectionTest(unittest.TestCase):
+    """
+    Tests for AsyncioConnection covering write, read, close, and error
+    handling at the reactor level.  Unlike the ReactorTestMixin used by
+    asyncore/libev, these tests exercise the public interface (push/close)
+    because handle_read/handle_write are async coroutines running inside
+    the event loop thread.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        if skip_me:
+            return
+        # Force a fresh reactor so we aren't affected by a previous test
+        # class that may have stopped the shared event loop.
+        if AsyncioConnection._loop is not None:
+            try:
+                AsyncioConnection._loop.call_soon_threadsafe(
+                    AsyncioConnection._loop.stop)
+            except RuntimeError:
+                pass
+            if AsyncioConnection._loop_thread:
+                AsyncioConnection._loop_thread.join(timeout=1.0)
+        AsyncioConnection._loop = None
+        AsyncioConnection._loop_thread = None
+        AsyncioConnection.initialize_reactor()
+        cls._loop = AsyncioConnection._loop
+        # Save original loop methods so we can restore after each test
+        cls._orig_sock_recv = cls._loop.sock_recv
+        cls._orig_sock_sendall = cls._loop.sock_sendall
+
+    @classmethod
+    def tearDownClass(cls):
+        if skip_me:
+            return
+        cls._loop.sock_recv = cls._orig_sock_recv
+        cls._loop.sock_sendall = cls._orig_sock_sendall
+
+    def _make_connection(self):
+        """
+        Create an AsyncioConnection with mocked socket and _connect_socket.
+        Loop socket methods are pre-mocked so that the handle_read/handle_write
+        coroutines started in __init__ don't hit real I/O.
+        """
+        mock_socket = MagicMock(spec=stdlib_socket.socket)
+        mock_socket.fileno.return_value = 99
+        mock_socket.setblocking = MagicMock()
+        mock_socket.connect.return_value = None
+        mock_socket.getsockopt.return_value = 0
+        mock_socket.send.side_effect = lambda x: len(x)
+
+        def fake_connect_socket(self_inner):
+            self_inner._socket = mock_socket
+
+        with patch.object(AsyncioConnection, '_connect_socket', 
fake_connect_socket):
+            conn = AsyncioConnection(
+                host='127.0.0.1',
+                cql_version='3.0.1',
+                connect_timeout=5,
+            )
+        return conn
+
+    def setUp(self):
+        if skip_me:
+            return
+
+        loop = self._loop
+
+        # Pre-mock sock_recv to block indefinitely (read loop won't spin)
+        self._recv_unblock = threading.Event()
+
+        async def blocking_recv(sock, bufsize):
+            while not self._recv_unblock.is_set():
+                await asyncio.sleep(0.01)
+            raise asyncio.CancelledError()
+
+        # Pre-mock sock_sendall to silently consume data (options message, 
etc.)
+        self._sent_data = []
+
+        async def capturing_sendall(sock, data):
+            self._sent_data.append(bytes(data))
+
+        loop.sock_recv = blocking_recv
+        loop.sock_sendall = capturing_sendall
+
+        self.conn = self._make_connection()
+        # Give the loop a moment to process __init__ tasks (options message)
+        time.sleep(0.1)
+        # Clear any data sent during init (options message)
+        self._sent_data.clear()

Review Comment:
   This test relies on fixed `time.sleep(...)` delays to wait for the event 
loop to process work (e.g., after creating the connection and in multiple test 
cases). Fixed sleeps are prone to flakiness on slow/loaded CI and can mask real 
race conditions. Prefer waiting on an explicit signal/condition with a timeout 
(e.g., polling for `_sent_data` length, using a `threading.Event`, or awaiting 
a `Future` from `run_coroutine_threadsafe(...).result(timeout=...)`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to