This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 348c182614a [v3-2-test] Fix triggerer file handle leak when remote log 
upload fails (#66675) (#66684)
348c182614a is described below

commit 348c182614a77facd72c66fb47656e3c7a9d9112
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 11 06:10:21 2026 +0200

    [v3-2-test] Fix triggerer file handle leak when remote log upload fails 
(#66675) (#66684)
    
    When a trigger finishes, the supervisor uploads its log to the remote log
    store and then closes the local file descriptor. If `upload_to_remote()`
    raised (e.g., S3/GCS throttling, transient network error), `close()` was
    never called and the underlying BufferedWriter — plus its 8 KiB buffer and
    the open fd — leaked for every failed upload.
    
    Wrap the cleanup in try/except/finally so the fd is always closed, and log
    the upload failure instead of letting it propagate into `handle_requests`.
    
    Surfaced in discussion #65985.
    (cherry picked from commit a6798ab8d8311e98b1c29cad54bd51cffd5ccac4)
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../src/airflow/jobs/triggerer_job_runner.py       | 11 ++++++++---
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 23 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 169455ac06f..4552b8a99e4 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -474,9 +474,14 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
                 self.running_triggers.discard(id)
                 self.cancelling_triggers.discard(id)
                 if factory := self.logger_cache.pop(id, None):
-                    factory.upload_to_remote()
-                    # Need to close the FD explicitly, as it is not closed 
when logger is removed.
-                    factory.close()
+                    try:
+                        factory.upload_to_remote()
+                    except Exception:
+                        log.exception("Failed to upload trigger logs to 
remote", trigger_id=id)
+                    finally:
+                        # Close the FD explicitly even if upload raised, 
otherwise the file
+                        # handle leaks for every failed upload.
+                        factory.close()
 
             response = messages.TriggerStateSync(
                 to_create=[],
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 550e9916e56..a6cd87d3dbd 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -402,6 +402,29 @@ def test_trigger_logger_fd_closed_when_removed(session):
     trigger_runner_supervisor.kill(force=False)
 
 
+def 
test_trigger_logger_fd_closed_when_upload_to_remote_raises(jobless_supervisor):
+    """If upload_to_remote() raises during finished-trigger cleanup, the FD 
must still be closed.
+
+    Regression test for the file handle leak referenced in
+    https://github.com/apache/airflow/discussions/65985 — without try/finally, 
a failed
+    remote-log upload would skip ``factory.close()`` and leak the underlying 
BufferedWriter
+    for every failed upload.
+    """
+    factory = MagicMock(spec=TriggerLoggingFactory)
+    factory.upload_to_remote.side_effect = RuntimeError("simulated 
remote-logging failure")
+
+    jobless_supervisor.logger_cache[42] = factory
+    jobless_supervisor.running_triggers.add(42)
+
+    msg = messages.TriggerStateChanges(finished=[42])
+    jobless_supervisor._handle_request(msg, 
log=MagicMock(spec=FilteringBoundLogger), req_id=0)
+
+    factory.upload_to_remote.assert_called_once()
+    factory.close.assert_called_once()
+    assert 42 not in jobless_supervisor.logger_cache
+    assert 42 not in jobless_supervisor.running_triggers
+
+
 class TestTriggerRunner:
     def test_run_inline_trigger_canceled(self, session) -> None:
         trigger_runner = TriggerRunner()

Reply via email to