This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a6798ab8d83 Fix triggerer file handle leak when remote log upload
fails (#66675)
a6798ab8d83 is described below
commit a6798ab8d8311e98b1c29cad54bd51cffd5ccac4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 11 05:52:26 2026 +0200
Fix triggerer file handle leak when remote log upload fails (#66675)
When a trigger finishes, the supervisor uploads its log to the remote log
store and then closes the local file descriptor. If `upload_to_remote()`
raised (e.g., S3/GCS throttling, transient network error), `close()` was
never called and the underlying BufferedWriter — plus its 8 KiB buffer and
the open fd — leaked for every failed upload.
Wrap the cleanup in try/except/finally so the fd is always closed, and log
the upload failure instead of letting it propagate into `handle_requests`.
Surfaced in discussion #65985.
---
.../src/airflow/jobs/triggerer_job_runner.py | 11 ++++++++---
airflow-core/tests/unit/jobs/test_triggerer_job.py | 23 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 4966313ddcb..9edde3b276e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -507,9 +507,14 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
self.running_triggers.discard(id)
self.cancelling_triggers.discard(id)
if factory := self.logger_cache.pop(id, None):
- factory.upload_to_remote()
- # Need to close the FD explicitly, as it is not closed
when logger is removed.
- factory.close()
+ try:
+ factory.upload_to_remote()
+ except Exception:
+ log.exception("Failed to upload trigger logs to
remote", trigger_id=id)
+ finally:
+ # Close the FD explicitly even if upload raised,
otherwise the file
+ # handle leaks for every failed upload.
+ factory.close()
response = messages.TriggerStateSync(
to_create=[],
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 27247a42952..0501783b992 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -612,6 +612,29 @@ def test_trigger_logger_fd_closed_when_removed(session):
trigger_runner_supervisor.kill(force=False)
+def
test_trigger_logger_fd_closed_when_upload_to_remote_raises(jobless_supervisor):
+ """If upload_to_remote() raises during finished-trigger cleanup, the FD
must still be closed.
+
+ Regression test for the file handle leak referenced in
+ https://github.com/apache/airflow/discussions/65985 — without try/finally,
a failed
+ remote-log upload would skip ``factory.close()`` and leak the underlying
BufferedWriter
+ for every failed upload.
+ """
+ factory = MagicMock(spec=TriggerLoggingFactory)
+ factory.upload_to_remote.side_effect = RuntimeError("simulated
remote-logging failure")
+
+ jobless_supervisor.logger_cache[42] = factory
+ jobless_supervisor.running_triggers.add(42)
+
+ msg = messages.TriggerStateChanges(finished=[42])
+ jobless_supervisor._handle_request(msg,
log=MagicMock(spec=FilteringBoundLogger), req_id=0)
+
+ factory.upload_to_remote.assert_called_once()
+ factory.close.assert_called_once()
+ assert 42 not in jobless_supervisor.logger_cache
+ assert 42 not in jobless_supervisor.running_triggers
+
+
class TestTriggerRunner:
def test_blocked_main_thread_warning_threshold_decode(self) -> None:
with conf_vars({("triggerer",
"blocked_main_thread_warning_threshold"): "0.5"}):