This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new cf9c8794174 [v3-2-test] Fix ValueError when supervisor force-closes 
stuck sockets after timeout (#67115) (#67162)
cf9c8794174 is described below

commit cf9c8794174eca9d49aaf90d0b05a9595298bb2e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 19 16:42:12 2026 +0530

    [v3-2-test] Fix ValueError when supervisor force-closes stuck sockets after 
timeout (#67115) (#67162)
    
    * Fix ValueError when supervisor force-closes stuck sockets after timeout
    
    * Improve mock socket spec
    
    ---------
    (cherry picked from commit 9bb5ff3535a46020ae22a61557aef7e806aa3e91)
    
    Co-authored-by: AutomationDev85 
<[email protected]>
---
 .../src/airflow/sdk/execution_time/supervisor.py     |  2 ++
 .../tests/task_sdk/execution_time/test_supervisor.py | 20 +++++++-------------
 2 files changed, 9 insertions(+), 13 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 0c88bc9812f..3f53a68016a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -787,6 +787,7 @@ class WatchedSubprocess:
         if stuck_sockets:
             log.warning("Force-closed stuck sockets", pid=self.pid, 
sockets=stuck_sockets)
 
+        self._open_sockets.clear()
         self.selector.close()
         self.stdin.close()
 
@@ -1249,6 +1250,7 @@ class ActivitySubprocess(WatchedSubprocess):
                         timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
                     )
                     self._cleanup_open_sockets()
+                    break
 
             if alive:
                 # We don't need to heartbeat if the process has shutdown, as 
we are just finishing of reading the
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 49e6a65c2e9..196679708e2 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -1141,15 +1141,13 @@ class TestWatchedSubprocess:
         assert rc == -signal_to_raise
 
     @pytest.mark.execution_timeout(3)
-    def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, 
time_machine):
+    def test_cleanup_sockets_after_delay(self, monkeypatch, mocker):
         """Supervisor should close sockets if EOF events are missed."""
 
         
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
 1.0)
 
         mock_process = mocker.Mock(pid=12345)
 
-        time_machine.move_to(time.time(), tick=False)
-
         proc = ActivitySubprocess(
             process_log=mocker.MagicMock(),
             id=TI_ID,
@@ -1164,19 +1162,15 @@ class TestWatchedSubprocess:
 
         proc._exit_code = 0
         # Create a fake placeholder in the open socket weakref
-        proc._open_sockets[mocker.MagicMock()] = "test placeholder"
-        proc._process_exit_monotonic = time.time()
-
-        mocker.patch.object(
-            ActivitySubprocess,
-            "_cleanup_open_sockets",
-            side_effect=lambda: setattr(proc, "_open_sockets", {}),
-        )
-
-        time_machine.shift(2)
+        mock_socket = mocker.MagicMock(spec=socket.socket)
+        proc._open_sockets[mock_socket] = "test placeholder"
+        proc._process_exit_monotonic = time.monotonic() - 2
 
         proc._monitor_subprocess()
         assert len(proc._open_sockets) == 0
+        mock_socket.close.assert_called_once()
+        proc.selector.close.assert_called_once()
+        proc.stdin.close.assert_called_once()
 
 
 class TestWatchedSubprocessKill:

Reply via email to