This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 86397cfdf0d [v3-0-test] Explicitly close log file descriptor in the 
supervise function (#51627) (#51654)
86397cfdf0d is described below

commit 86397cfdf0d756f339f08e1d889a3374c8c793c5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jun 12 20:39:03 2025 +0530

    [v3-0-test] Explicitly close log file descriptor in the supervise function 
(#51627) (#51654)
    
    We didn't close log file descriptor properly hence leading to too many open 
files error from the operating system.
    (cherry picked from commit 4a0a89b6733002b8adccaf09d9f76d26c0d9d706)
    
    Co-authored-by: Adylzhan Khashtamov <adil.khashta...@gmail.com>
---
 task-sdk/src/airflow/sdk/execution_time/supervisor.py | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b2dd76d85b1..f3cc633fb63 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -35,6 +35,7 @@ from http import HTTPStatus
 from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair
 from typing import (
     TYPE_CHECKING,
+    BinaryIO,
     Callable,
     ClassVar,
     NoReturn,
@@ -1512,6 +1513,7 @@ def supervise(
 
     # TODO: Use logging providers to handle the chunked upload for us etc.
     logger: FilteringBoundLogger | None = None
+    log_file_descriptor: BinaryIO | TextIO | None = None
     if log_path:
         # If we are told to write logs to a file, redirect the task logger to 
it. Make sure we append to the
         # file though, otherwise when we resume we would lose the logs from 
the start->deferral segment if it
@@ -1522,9 +1524,11 @@ def supervise(
 
         pretty_logs = False
         if pretty_logs:
-            underlying_logger: WrappedLogger = 
structlog.WriteLogger(log_file.open("a", buffering=1))
+            log_file_descriptor = log_file.open("a", buffering=1)
+            underlying_logger: WrappedLogger = 
structlog.WriteLogger(cast("TextIO", log_file_descriptor))
         else:
-            underlying_logger = structlog.BytesLogger(log_file.open("ab"))
+            log_file_descriptor = log_file.open("ab")
+            underlying_logger = structlog.BytesLogger(cast("BinaryIO", 
log_file_descriptor))
         processors = logging_processors(enable_pretty_log=pretty_logs)[0]
         logger = structlog.wrap_logger(underlying_logger, 
processors=processors, logger_name="task").bind()
 
@@ -1549,4 +1553,6 @@ def supervise(
     exit_code = process.wait()
     end = time.monotonic()
     log.info("Task finished", exit_code=exit_code, duration=end - start, 
final_state=process.final_state)
+    if log_path and log_file_descriptor:
+        log_file_descriptor.close()
     return exit_code

Reply via email to