This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9bb5ff3535a Fix ValueError when supervisor force-closes stuck sockets
after timeout (#67115)
9bb5ff3535a is described below
commit 9bb5ff3535a46020ae22a61557aef7e806aa3e91
Author: AutomationDev85 <[email protected]>
AuthorDate: Tue May 19 12:02:59 2026 +0200
Fix ValueError when supervisor force-closes stuck sockets after timeout
(#67115)
* Fix ValueError when supervisor force-closes stuck sockets after timeout
* Improve mock socket spec
---------
Co-authored-by: AutomationDev85 <AutomationDev85>
---
.../src/airflow/sdk/execution_time/supervisor.py | 2 ++
.../tests/task_sdk/execution_time/test_supervisor.py | 20 +++++++-------------
2 files changed, 9 insertions(+), 13 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 5e46b6bc864..27dcbb44346 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -823,6 +823,7 @@ class WatchedSubprocess:
if stuck_sockets:
log.warning("Force-closed stuck sockets", pid=self.pid,
sockets=stuck_sockets)
+ self._open_sockets.clear()
self.selector.close()
self.stdin.close()
@@ -1312,6 +1313,7 @@ class ActivitySubprocess(WatchedSubprocess):
timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
)
self._cleanup_open_sockets()
+ break
if alive:
# We don't need to heartbeat if the process has shutdown, as
we are just finishing of reading the
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index b4ba8de42e2..881f5ddaad1 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -1165,15 +1165,13 @@ class TestWatchedSubprocess:
assert rc == -signal_to_raise
@pytest.mark.execution_timeout(3)
- def test_cleanup_sockets_after_delay(self, monkeypatch, mocker,
time_machine):
+ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker):
"""Supervisor should close sockets if EOF events are missed."""
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT",
1.0)
mock_process = mocker.Mock(pid=12345)
- time_machine.move_to(time.time(), tick=False)
-
proc = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
@@ -1188,19 +1186,15 @@ class TestWatchedSubprocess:
proc._exit_code = 0
# Create a fake placeholder in the open socket weakref
- proc._open_sockets[mocker.MagicMock()] = "test placeholder"
- proc._process_exit_monotonic = time.time()
-
- mocker.patch.object(
- ActivitySubprocess,
- "_cleanup_open_sockets",
- side_effect=lambda: setattr(proc, "_open_sockets", {}),
- )
-
- time_machine.shift(2)
+ mock_socket = mocker.MagicMock(spec=socket.socket)
+ proc._open_sockets[mock_socket] = "test placeholder"
+ proc._process_exit_monotonic = time.monotonic() - 2
proc._monitor_subprocess()
assert len(proc._open_sockets) == 0
+ mock_socket.close.assert_called_once()
+ proc.selector.close.assert_called_once()
+ proc.stdin.close.assert_called_once()
class TestWatchedSubprocessKill: