This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new cf9c8794174 [v3-2-test] Fix ValueError when supervisor force-closes
stuck sockets after timeout (#67115) (#67162)
cf9c8794174 is described below
commit cf9c8794174eca9d49aaf90d0b05a9595298bb2e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 19 16:42:12 2026 +0530
[v3-2-test] Fix ValueError when supervisor force-closes stuck sockets after
timeout (#67115) (#67162)
* Fix ValueError when supervisor force-closes stuck sockets after timeout
* Improve mock socket spec
---------
(cherry picked from commit 9bb5ff3535a46020ae22a61557aef7e806aa3e91)
Co-authored-by: AutomationDev85
<[email protected]>
---
.../src/airflow/sdk/execution_time/supervisor.py | 2 ++
.../tests/task_sdk/execution_time/test_supervisor.py | 20 +++++++-------------
2 files changed, 9 insertions(+), 13 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 0c88bc9812f..3f53a68016a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -787,6 +787,7 @@ class WatchedSubprocess:
if stuck_sockets:
log.warning("Force-closed stuck sockets", pid=self.pid,
sockets=stuck_sockets)
+ self._open_sockets.clear()
self.selector.close()
self.stdin.close()
@@ -1249,6 +1250,7 @@ class ActivitySubprocess(WatchedSubprocess):
timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
)
self._cleanup_open_sockets()
+ break
if alive:
# We don't need to heartbeat if the process has shutdown, as
we are just finishing of reading the
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 49e6a65c2e9..196679708e2 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -1141,15 +1141,13 @@ class TestWatchedSubprocess:
assert rc == -signal_to_raise
@pytest.mark.execution_timeout(3)
- def test_cleanup_sockets_after_delay(self, monkeypatch, mocker,
time_machine):
+ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker):
"""Supervisor should close sockets if EOF events are missed."""
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
1.0)
mock_process = mocker.Mock(pid=12345)
- time_machine.move_to(time.time(), tick=False)
-
proc = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
@@ -1164,19 +1162,15 @@ class TestWatchedSubprocess:
proc._exit_code = 0
# Create a fake placeholder in the open socket weakref
- proc._open_sockets[mocker.MagicMock()] = "test placeholder"
- proc._process_exit_monotonic = time.time()
-
- mocker.patch.object(
- ActivitySubprocess,
- "_cleanup_open_sockets",
- side_effect=lambda: setattr(proc, "_open_sockets", {}),
- )
-
- time_machine.shift(2)
+ mock_socket = mocker.MagicMock(spec=socket.socket)
+ proc._open_sockets[mock_socket] = "test placeholder"
+ proc._process_exit_monotonic = time.monotonic() - 2
proc._monitor_subprocess()
assert len(proc._open_sockets) == 0
+ mock_socket.close.assert_called_once()
+ proc.selector.close.assert_called_once()
+ proc.stdin.close.assert_called_once()
class TestWatchedSubprocessKill: