This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82b3c924dce Fix memory leak in LocalExecutor caused by unreleased file
descriptor locks (#65121)
82b3c924dce is described below
commit 82b3c924dce923d59115d53367e61cd3ab15dcee
Author: Jeongwoo Do <[email protected]>
AuthorDate: Thu May 14 04:57:40 2026 +0900
Fix memory leak in LocalExecutor caused by unreleased file descriptor locks
(#65121)
This "backports" a change we made to Structlog to make it available in
earlier versions
of Airflow.
---
shared/logging/src/airflow_shared/logging/structlog.py | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py
b/shared/logging/src/airflow_shared/logging/structlog.py
index 7dddafe3b0a..5441913ec19 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -24,6 +24,7 @@ import logging
import os
import re
import sys
+import weakref
from collections.abc import Callable, Iterable, Mapping, Sequence
from functools import cache, cached_property, partial
from pathlib import Path
@@ -593,6 +594,17 @@ def configure_logging(
text_output = cast("TextIO", output)
logger_factory = LoggerFactory(NamedWriteLogger, io=text_output)
+ # Replace structlog's WRITE_LOCKS dict with a WeakKeyDictionary so entries
+ # for closed file descriptors are garbage-collected instead of leaking.
+ # TODO: drop once structlog ships the upstream fix (tracked for 26.1.0).
+ try:
+ from structlog import _output as _structlog_output
+
+ if isinstance(_structlog_output.WRITE_LOCKS, dict):
+ _structlog_output.WRITE_LOCKS = weakref.WeakKeyDictionary() #
type: ignore[assignment]
+ except Exception:
+ pass
+
structlog.configure(
processors=shared_pre_chain + [for_structlog],
cache_logger_on_first_use=cache_logger_on_first_use,