This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push: new 86397cfdf0d [v3-0-test] Explicitly close log file descriptor in the supervise function (#51627) (#51654) 86397cfdf0d is described below commit 86397cfdf0d756f339f08e1d889a3374c8c793c5 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Jun 12 20:39:03 2025 +0530 [v3-0-test] Explicitly close log file descriptor in the supervise function (#51627) (#51654) We didn't close log file descriptor properly hence leading to too many open files error from the operating system. (cherry picked from commit 4a0a89b6733002b8adccaf09d9f76d26c0d9d706) Co-authored-by: Adylzhan Khashtamov <adil.khashta...@gmail.com> --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index b2dd76d85b1..f3cc633fb63 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -35,6 +35,7 @@ from http import HTTPStatus from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair from typing import ( TYPE_CHECKING, + BinaryIO, Callable, ClassVar, NoReturn, @@ -1512,6 +1513,7 @@ def supervise( # TODO: Use logging providers to handle the chunked upload for us etc. logger: FilteringBoundLogger | None = None + log_file_descriptor: BinaryIO | TextIO | None = None if log_path: # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it @@ -1522,9 +1524,11 @@ def supervise( pretty_logs = False if pretty_logs: - underlying_logger: WrappedLogger = structlog.WriteLogger(log_file.open("a", buffering=1)) + log_file_descriptor = log_file.open("a", buffering=1) + underlying_logger: WrappedLogger = structlog.WriteLogger(cast("TextIO", log_file_descriptor)) else: - underlying_logger = structlog.BytesLogger(log_file.open("ab")) + log_file_descriptor = log_file.open("ab") + underlying_logger = structlog.BytesLogger(cast("BinaryIO", log_file_descriptor)) processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() @@ -1549,4 +1553,6 @@ def supervise( exit_code = process.wait() end = time.monotonic() log.info("Task finished", exit_code=exit_code, duration=end - start, final_state=process.final_state) + if log_path and log_file_descriptor: + log_file_descriptor.close() return exit_code