This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4a0a89b6733 Explicitly close log file descriptor in the supervise
function (#51627)
4a0a89b6733 is described below
commit 4a0a89b6733002b8adccaf09d9f76d26c0d9d706
Author: Adylzhan Khashtamov <[email protected]>
AuthorDate: Thu Jun 12 18:10:33 2025 +0500
Explicitly close log file descriptor in the supervise function (#51627)
We didn't close log file descriptor properly hence leading to too many open
files error from the operating system.
---
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 3fb5a9877a9..dc63daeef7b 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -35,6 +35,7 @@ from http import HTTPStatus
from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair
from typing import (
TYPE_CHECKING,
+ BinaryIO,
Callable,
ClassVar,
NoReturn,
@@ -1565,6 +1566,7 @@ def supervise(
# TODO: Use logging providers to handle the chunked upload for us etc.
logger: FilteringBoundLogger | None = None
+ log_file_descriptor: BinaryIO | TextIO | None = None
if log_path:
# If we are told to write logs to a file, redirect the task logger to
it. Make sure we append to the
# file though, otherwise when we resume we would lose the logs from
the start->deferral segment if it
@@ -1575,9 +1577,11 @@ def supervise(
pretty_logs = False
if pretty_logs:
- underlying_logger: WrappedLogger =
structlog.WriteLogger(log_file.open("a", buffering=1))
+ log_file_descriptor = log_file.open("a", buffering=1)
+ underlying_logger: WrappedLogger =
structlog.WriteLogger(cast("TextIO", log_file_descriptor))
else:
- underlying_logger = structlog.BytesLogger(log_file.open("ab"))
+ log_file_descriptor = log_file.open("ab")
+ underlying_logger = structlog.BytesLogger(cast("BinaryIO",
log_file_descriptor))
processors = logging_processors(enable_pretty_log=pretty_logs)[0]
logger = structlog.wrap_logger(underlying_logger,
processors=processors, logger_name="task").bind()
@@ -1602,4 +1606,6 @@ def supervise(
exit_code = process.wait()
end = time.monotonic()
log.info("Task finished", exit_code=exit_code, duration=end - start,
final_state=process.final_state)
+ if log_path and log_file_descriptor:
+ log_file_descriptor.close()
return exit_code