This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch structlog-in-logging-mixin
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 54c87c691810dfa6297f93d2d792887e9a8e349d
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Mar 6 15:06:53 2025 +0000

    WIP: switch all airflow logging to structlog
---
 airflow-core/pyproject.toml                        |   2 +
 airflow-core/src/airflow/_logging/__init__.py      |  16 +
 airflow-core/src/airflow/_logging/structlog.py     | 472 +++++++++++++++++++++
 .../src/airflow/jobs/triggerer_job_runner.py       |   5 +-
 airflow-core/src/airflow/logging_config.py         |   9 +-
 .../airflow/utils/log/file_processor_handler.py    |   4 +-
 .../src/airflow/utils/log/logging_mixin.py         |  19 +-
 airflow-core/src/airflow/utils/retries.py          |   2 +-
 .../tests/unit/always/test_providers_manager.py    |   9 +-
 .../tests/unit/core/test_logging_config.py         | 344 ---------------
 airflow-core/tests/unit/core/test_stats.py         |   8 +-
 .../unit/sensors/test_external_task_sensor.py      |  18 +-
 .../tests/unit/utils/log/test_log_reader.py        |  28 +-
 airflow-core/tests/unit/utils/test_log_handlers.py |   6 +-
 .../tests/unit/utils/test_logging_mixin.py         |  12 -
 .../tests/unit/utils/test_process_utils.py         |  11 +-
 .../test_task_handler_with_custom_formatter.py     | 115 -----
 devel-common/src/tests_common/pytest_plugin.py     |  70 ++-
 devel-common/src/tests_common/test_utils/logs.py   | 191 +++++++++
 19 files changed, 775 insertions(+), 566 deletions(-)

diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index d3e46ce4106..d245204123c 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -109,6 +109,7 @@ dependencies = [
     # See https://github.com/pygments/pygments/issues/2834
     "pygments>=2.0.1,!=2.19.0",
     "pyjwt>=2.10.0",
+    "pygtrie>=2.5.0",
     "python-daemon>=3.0.0",
     "python-dateutil>=2.7.0",
     "python-slugify>=5.0",
@@ -124,6 +125,7 @@ dependencies = [
     "sqlalchemy>=1.4.49,<2.0",
     "sqlalchemy-jsonfield>=1.0",
     "sqlalchemy-utils>=0.41.2",
+    "structlog>=25.1.0",
     "svcs>=25.1.0",
     "tabulate>=0.7.5",
     "tenacity>=8.0.0,!=8.2.0",
diff --git a/airflow-core/src/airflow/_logging/__init__.py 
b/airflow-core/src/airflow/_logging/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-core/src/airflow/_logging/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow-core/src/airflow/_logging/structlog.py 
b/airflow-core/src/airflow/_logging/structlog.py
new file mode 100644
index 00000000000..be64336e80f
--- /dev/null
+++ b/airflow-core/src/airflow/_logging/structlog.py
@@ -0,0 +1,472 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import itertools
+import logging
+import os
+import sys
+import warnings
+from functools import cache
+from typing import Any, BinaryIO, Callable, Generic, Protocol, TextIO, 
TypeVar, Union, cast
+
+import pygtrie
+import structlog
+from structlog.processors import NAME_TO_LEVEL
+from structlog.typing import (
+    BindableLogger,
+    EventDict,
+    ExcInfo,
+    FilteringBoundLogger,
+    Processor,
+    WrappedLogger,
+)
+
+log = logging.getLogger(__name__)
+
+
+class AirflowFilteringBoundLogger(FilteringBoundLogger, Protocol):
+    def isEnabledFor(self, level: int): ...
+    def getEffectiveLevel(self) -> int: ...
+
+    name: str
+
+
+LEVEL_TO_FILTERING_LOGGER: dict[int, type[AirflowFilteringBoundLogger]] = {}
+
+
+def _make_airflow_structlogger(min_level):
+    # This uses 
https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126
+    # as inspiration
+
+    LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()}
+
+    def isEnabledFor(self: Any, level):
+        return self.is_enabled_for(level)
+
+    def getEffectiveLevel(self: Any):
+        return self.get_effective_level()
+
+    @property
+    def name(self):
+        return self._logger.name
+
+    base = structlog.make_filtering_bound_logger(min_level)
+
+    cls = type(
+        f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 
'Notset').capitalize()}",
+        (base,),
+        {
+            "isEnabledFor": isEnabledFor,
+            "getEffectiveLevel": getEffectiveLevel,
+            "name": name,
+        },
+    )
+    LEVEL_TO_FILTERING_LOGGER[min_level] = cls
+    return cls
+
+
+AirflowBoundLoggerFilteringAtNotset = 
_make_airflow_structlogger(NAME_TO_LEVEL["notset"])
+AirflowBoundLoggerFilteringAtDebug = 
_make_airflow_structlogger(NAME_TO_LEVEL["debug"])
+AirflowBoundLoggerFilteringAtInfo = 
_make_airflow_structlogger(NAME_TO_LEVEL["info"])
+AirflowBoundLoggerFilteringAtWarning = 
_make_airflow_structlogger(NAME_TO_LEVEL["warning"])
+AirflowBoundLoggerFilteringAtError = 
_make_airflow_structlogger(NAME_TO_LEVEL["error"])
+AirflowBoundLoggerFilteringAtCritical = 
_make_airflow_structlogger(NAME_TO_LEVEL["critical"])
+
+# We use a trie structure so that we can easily and quickly find the most 
suitable log level to apply.
+# This mirrors the logging level cascade behavior from stdlib logging, without 
the complexity of multiple
+# handlers etc
+PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".")
+PER_LOGGER_LEVELS.update(
+    {
+        # Top level logging default - changed to respect config in 
`configure_structlog`
+        "": NAME_TO_LEVEL["info"],
+    }
+)
+
+
+def make_filtering_logger() -> Callable[..., BindableLogger]:
+    def maker(logger: WrappedLogger, *args, **kwargs):
+        # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow 
specific subclass) then
+        # look up the global per-logger config and redirect to a new class.
+
+        logger_name = kwargs.get("context", {}).get("logger_name")
+        if not logger_name and isinstance(logger, (NamedWriteLogger, 
NamedBytesLogger)):
+            logger_name = logger.name
+
+        if logger_name:
+            level = 
PER_LOGGER_LEVELS.longest_prefix(logger_name).get(PER_LOGGER_LEVELS[""])
+        else:
+            level = PER_LOGGER_LEVELS[""]
+        return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs)  # 
type: ignore[call-arg]
+
+    return maker
+
+
+class NamedBytesLogger(structlog.BytesLogger):
+    __slots__ = ("name",)
+
+    def __init__(self, name: str | None = None, file: BinaryIO | None = None):
+        self.name = name
+        super().__init__(file)
+
+
+class NamedWriteLogger(structlog.WriteLogger):
+    __slots__ = ("name",)
+
+    def __init__(self, name: str | None = None, file: TextIO | None = None):
+        self.name = name
+        super().__init__(file)
+
+
+LogOutputType = TypeVar("LogOutputType", bound=Union[TextIO, BinaryIO])
+
+
+class LoggerFactory(Generic[LogOutputType]):
+    def __init__(
+        self,
+        cls: Callable[[str | None, LogOutputType | None], WrappedLogger],
+        io: LogOutputType | None = None,
+    ):
+        self.cls = cls
+        self.io = io
+
+    def __call__(self, logger_name: str | None = None, *args: Any) -> 
WrappedLogger:
+        return self.cls(logger_name, self.io)
+
+
+def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> 
EventDict:
+    if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, 
"name", None)):
+        event_dict.setdefault("logger", logger_name)
+    return event_dict
+
+
+def exception_group_tracebacks(
+    format_exception: Callable[[ExcInfo], list[dict[str, Any]]],
+) -> Processor:
+    # Make mypy happy
+    if not hasattr(__builtins__, "BaseExceptionGroup"):
+        T = TypeVar("T")
+
+        class BaseExceptionGroup(Generic[T]):
+            exceptions: list[T]
+
+    def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: 
EventDict) -> EventDict:
+        if exc_info := event_dict.get("exc_info", None):
+            group: BaseExceptionGroup[Exception] | None = None
+            if exc_info is True:
+                # `log.exception('mesg")` case
+                exc_info = sys.exc_info()
+                if exc_info[0] is None:
+                    exc_info = None
+
+            if (
+                isinstance(exc_info, tuple)
+                and len(exc_info) == 3
+                and isinstance(exc_info[1], BaseExceptionGroup)
+            ):
+                group = exc_info[1]
+            elif isinstance(exc_info, BaseExceptionGroup):
+                group = exc_info
+
+            if group:
+                # Only remove it from event_dict if we handle it
+                del event_dict["exc_info"]
+                event_dict["exception"] = list(
+                    itertools.chain.from_iterable(
+                        format_exception((type(exc), exc, exc.__traceback__))  
# type: ignore[attr-defined,arg-type]
+                        for exc in (*group.exceptions, group)
+                    )
+                )
+
+        return event_dict
+
+    return _exception_group_tracebacks
+
+
+# `eyJ` is `{"` in base64 encoding -- and any value that starts like that is 
in high likely hood a JWT
+# token. Better safe than sorry
+def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> 
EventDict:
+    for k, v in event_dict.items():
+        if isinstance(v, str) and v.startswith("eyJ"):
+            event_dict[k] = "eyJ***"
+    return event_dict
+
+
+def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) 
-> EventDict:
+    event_dict.pop("positional_args", None)
+    return event_dict
+
+
+@cache
+def structlog_processors(
+    json_output: bool,
+):
+    if json_output:
+        timestamper = structlog.processors.MaybeTimeStamper(fmt="iso")
+    else:
+        timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d 
%H:%M:%S.%f")
+
+    processors: list[structlog.typing.Processor] = [
+        timestamper,
+        structlog.contextvars.merge_contextvars,
+        structlog.processors.add_log_level,
+        structlog.stdlib.PositionalArgumentsFormatter(),
+        logger_name,
+        redact_jwt,
+        structlog.processors.StackInfoRenderer(),
+    ]
+
+    # Imports to suppress showing code from these modules. We need the import 
to get the filepath for
+    # structlog to ignore.
+    import contextlib
+
+    import click
+    import httpcore
+    import httpx
+
+    suppress = (
+        click,
+        contextlib,
+        httpx,
+        httpcore,
+        httpx,
+    )
+
+    if json_output:
+        dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer(
+            use_rich=False, show_locals=False, suppress=suppress
+        )
+
+        dict_tracebacks = 
structlog.processors.ExceptionRenderer(dict_exc_formatter)
+        if hasattr(__builtins__, "BaseExceptionGroup"):
+            exc_group_processor = 
exception_group_tracebacks(dict_exc_formatter)
+            processors.append(exc_group_processor)
+        else:
+            exc_group_processor = None
+
+        import msgspec
+
+        def json_dumps(msg, default):
+            # Note: this is likely an "expensive" step, but lets massage the 
dict order for nice
+            # viewing of the raw JSON logs.
+            # Maybe we don't need this once the UI renders the JSON instead of 
displaying the raw text
+            msg = {
+                "timestamp": msg.pop("timestamp"),
+                "level": msg.pop("level"),
+                "event": msg.pop("event"),
+                **msg,
+            }
+            return msgspec.json.encode(msg, enc_hook=default)
+
+        def json_processor(logger: Any, method_name: Any, event_dict: 
EventDict) -> str:
+            # Stdlib logging doesn't need the re-ordering, it's fine as it is
+            return msgspec.json.encode(event_dict).decode("utf-8")
+
+        json = structlog.processors.JSONRenderer(serializer=json_dumps)
+
+        processors.extend(
+            (
+                dict_tracebacks,
+                structlog.processors.UnicodeDecoder(),
+                json,
+            ),
+        )
+
+        return processors, {
+            "timestamper": timestamper,
+            "exc_group_processor": exc_group_processor,
+            "dict_tracebacks": dict_tracebacks,
+            "json": json_processor,
+        }
+    else:
+        rich_exc_formatter = structlog.dev.RichTracebackFormatter(
+            # These values are picked somewhat arbitrarily to produce 
useful-but-compact tracebacks. If
+            # we ever need to change these then they should be configurable.
+            extra_lines=0,
+            max_frames=30,
+            indent_guides=False,
+            suppress=suppress,
+        )
+        my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles()
+        my_styles["debug"] = structlog.dev.CYAN
+
+        console = structlog.dev.ConsoleRenderer(
+            exception_formatter=rich_exc_formatter, level_styles=my_styles
+        )
+        processors.append(console)
+        return processors, {
+            "timestamper": timestamper,
+            "console": console,
+        }
+
+
+def configure_structlog(
+    json_output: bool = False,
+    log_level: str = "DEBUG",
+    cache_logger_on_first_use: bool = True,
+    stdlib_config: dict | None = None,
+):
+    if "fatal" not in NAME_TO_LEVEL:
+        NAME_TO_LEVEL["fatal"] = NAME_TO_LEVEL["critical"]
+
+    stdlib_config = stdlib_config or {}
+
+    """Set up struct logging and stdlib logging config."""
+    PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()]
+
+    if json_output:
+        formatter = "plain"
+    else:
+        formatter = "colored"
+    processors, named = structlog_processors(json_output)
+    timestamper = named["timestamper"]
+
+    pre_chain: list[structlog.typing.Processor] = [
+        # Add the log level and a timestamp to the event_dict if the log entry
+        # is not from structlog.
+        structlog.stdlib.add_log_level,
+        structlog.stdlib.add_logger_name,
+        timestamper,
+        structlog.contextvars.merge_contextvars,
+        redact_jwt,
+    ]
+
+    # Don't cache the loggers during tests, it make it hard to capture them
+    if "PYTEST_VERSION" in os.environ:
+        cache_logger_on_first_use = False
+
+    color_formatter: list[structlog.typing.Processor] = [
+        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+        drop_positional_args,
+    ]
+    std_lib_formatter: list[structlog.typing.Processor] = [
+        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+        drop_positional_args,
+    ]
+
+    wrapper_class = cast(type[BindableLogger], make_filtering_logger())
+    if json_output:
+        structlog.configure(
+            processors=processors,
+            cache_logger_on_first_use=cache_logger_on_first_use,
+            wrapper_class=wrapper_class,
+            logger_factory=LoggerFactory(NamedBytesLogger),  # type: 
ignore[type-var]
+        )
+
+        if processor := named["exc_group_processor"]:
+            pre_chain.append(processor)
+        pre_chain.append(named["dict_tracebacks"])
+        color_formatter.append(named["json"])
+        std_lib_formatter.append(named["json"])
+    else:
+        structlog.configure(
+            processors=processors,
+            cache_logger_on_first_use=cache_logger_on_first_use,
+            wrapper_class=wrapper_class,
+            logger_factory=LoggerFactory(NamedWriteLogger),  # type: 
ignore[type-var]
+        )
+        color_formatter.append(named["console"])
+
+    global _warnings_showwarning
+
+    if _warnings_showwarning is None:
+        _warnings_showwarning = warnings.showwarning
+        # Capture warnings and show them via structlog
+        warnings.showwarning = _showwarning
+
+    import logging.config
+
+    config = {**stdlib_config}
+    config["formatters"] = {**config["formatters"]}
+    config["handlers"] = {**config["handlers"]}
+    config["loggers"] = {**config["loggers"]}
+    config["formatters"].update(
+        {
+            "plain": {
+                "()": structlog.stdlib.ProcessorFormatter,
+                "processors": std_lib_formatter,
+                "foreign_pre_chain": pre_chain,
+                "pass_foreign_args": True,
+            },
+            "colored": {
+                "()": structlog.stdlib.ProcessorFormatter,
+                "processors": color_formatter,
+                "foreign_pre_chain": pre_chain,
+                "pass_foreign_args": True,
+            },
+        }
+    )
+    config["handlers"].update(
+        {
+            "default": {
+                "level": log_level.upper(),
+                "class": "logging.StreamHandler",
+                "formatter": formatter,
+            },
+        }
+    )
+    for log_config in config["loggers"].values():
+        # We want everything to go via structlog, remove whatever the user 
might have configured
+        log_config.pop("stream", None)
+        log_config.pop("formatter", None)
+        # log_config.pop("handlers", None)
+    config["loggers"].update(
+        {
+            # Set Airflow logging to the level requested, but most everything 
else at "INFO"
+            "airflow": {"level": log_level.upper()},
+            # These ones are too chatty even at info
+            "httpx": {"level": "WARN"},
+            "sqlalchemy.engine": {"level": "WARN"},
+        }
+    )
+    config["root"] = {
+        "handlers": ["default"],
+        "level": "INFO",
+        "propagate": True,
+    }
+
+    logging.config.dictConfig(config)
+
+
+_warnings_showwarning: Any = None
+
+
+def _showwarning(
+    message: Warning | str,
+    category: type[Warning],
+    filename: str,
+    lineno: int,
+    file: TextIO | None = None,
+    line: str | None = None,
+) -> Any:
+    """
+    Redirects warnings to structlog so they appear in task logs etc.
+
+    Implementation of showwarnings which redirects to logging, which will first
+    check to see if the file parameter is None. If a file is specified, it will
+    delegate to the original warnings implementation of showwarning. Otherwise,
+    it will call warnings.formatwarning and will log the resulting string to a
+    warnings logger named "py.warnings" with level logging.WARNING.
+    """
+    if file is not None:
+        if _warnings_showwarning is not None:
+            _warnings_showwarning(message, category, filename, lineno, file, 
line)
+    else:
+        log = structlog.get_logger("py.warnings")
+        log.warning(str(message), category=category.__name__, 
filename=filename, lineno=lineno)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index bcf0bb9bb96..3d3a462dfef 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -547,7 +547,7 @@ class TriggerDetails(TypedDict):
     events: int
 
 
-class TriggerRunner:
+class TriggerRunner(LoggingMixin):
     """
     Runtime environment for all triggers.
 
@@ -579,9 +579,6 @@ class TriggerRunner:
     # TODO: set this in a sig-int handler
     stop: bool = False
 
-    # TODO: connect this to the parent process
-    log: FilteringBoundLogger = structlog.get_logger()
-
     requests_sock: asyncio.StreamWriter
 
     def __init__(self):
diff --git a/airflow-core/src/airflow/logging_config.py 
b/airflow-core/src/airflow/logging_config.py
index f0497c57409..0873b8b29bf 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -19,8 +19,8 @@ from __future__ import annotations
 
 import logging
 import warnings
-from logging.config import dictConfig
 
+from airflow._logging.structlog import configure_structlog
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.utils.module_loading import import_string
@@ -65,20 +65,21 @@ def configure_logging():
             if "mask_secrets" not in task_handler_config["filters"]:
                 task_handler_config["filters"].append("mask_secrets")
 
+        level: str = conf.get_mandatory_value("logging", 
"LOGGING_LEVEL").upper()
         # Try to init logging
-        dictConfig(logging_config)
+        configure_structlog(log_level=level, stdlib_config=logging_config)
     except (ValueError, KeyError) as e:
         log.error("Unable to load the config, contains a configuration error.")
         # When there is an error in the config, escalate the exception
         # otherwise Airflow would silently fall back on the default config
         raise e
 
-    validate_logging_config(logging_config)
+    validate_logging_config()
 
     return logging_class_path
 
 
-def validate_logging_config(logging_config):
+def validate_logging_config():
     """Validate the provided Logging Config."""
     # Now lets validate the other logging-related settings
     task_log_reader = conf.get("logging", "task_log_reader")
diff --git a/airflow-core/src/airflow/utils/log/file_processor_handler.py 
b/airflow-core/src/airflow/utils/log/file_processor_handler.py
index 5dbd034cea8..cf0cec6ed83 100644
--- a/airflow-core/src/airflow/utils/log/file_processor_handler.py
+++ b/airflow-core/src/airflow/utils/log/file_processor_handler.py
@@ -25,7 +25,7 @@ from pathlib import Path
 from airflow import settings
 from airflow.utils import timezone
 from airflow.utils.helpers import parse_template_string
-from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
+from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 
 logger = logging.getLogger(__name__)
@@ -69,7 +69,7 @@ class FileProcessorHandler(logging.Handler):
             self._symlink_latest_log_directory()
             self._cur_date = datetime.today()
 
-        return DISABLE_PROPOGATE
+        return SetContextPropagate.DISABLE_PROPAGATE
 
     def emit(self, record):
         if self.handler is not None:
diff --git a/airflow-core/src/airflow/utils/log/logging_mixin.py 
b/airflow-core/src/airflow/utils/log/logging_mixin.py
index d06ba0b7d34..785503ae13e 100644
--- a/airflow-core/src/airflow/utils/log/logging_mixin.py
+++ b/airflow-core/src/airflow/utils/log/logging_mixin.py
@@ -19,15 +19,16 @@ from __future__ import annotations
 
 import abc
 import enum
-import logging
 import re
 import sys
 from io import TextIOBase, UnsupportedOperation
 from logging import Handler, StreamHandler
 from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast
 
+import structlog
+
 if TYPE_CHECKING:
-    from logging import Logger
+    from airflow._logging.structlog import AirflowFilteringBoundLogger
 
 # 7-bit C1 ANSI escape sequences
 ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
@@ -67,7 +68,7 @@ _T = TypeVar("_T")
 class LoggingMixin:
     """Convenience super-class to have a logger configured with the class 
name."""
 
-    _log: logging.Logger | None = None
+    _log: AirflowFilteringBoundLogger | None = None
 
     # Parent logger used by this class. It should match one of the loggers 
defined in the
     # `logging_config_class`. By default, this attribute is used to create the 
final name of the logger, and
@@ -104,29 +105,27 @@ class LoggingMixin:
         return logger_name
 
     @classmethod
-    def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
+    def _get_log(cls, obj: Any, clazz: type[_T]) -> 
AirflowFilteringBoundLogger:
         if obj._log is None:
             logger_name: str = cls._create_logger_name(
                 logged_class=clazz,
                 log_config_logger_name=obj._log_config_logger_name,
                 class_logger_name=obj._logger_name,
             )
-            obj._log = logging.getLogger(logger_name)
+            obj._log = structlog.get_logger(logger_name)
         return obj._log
 
     @classmethod
-    def logger(cls) -> Logger:
+    def logger(cls) -> AirflowFilteringBoundLogger:
         """Return a logger."""
         return LoggingMixin._get_log(cls, cls)
 
     @property
-    def log(self) -> Logger:
+    def log(self) -> AirflowFilteringBoundLogger:
         """Return a logger."""
         return LoggingMixin._get_log(self, self.__class__)
 
-    def _set_context(self, context):
-        if context is not None:
-            set_context(self.log, context)
+    def _set_context(self, context): ...
 
 
 class ExternalLoggingMixin:
diff --git a/airflow-core/src/airflow/utils/retries.py 
b/airflow-core/src/airflow/utils/retries.py
index 809d176ef6c..dad8af58fd3 100644
--- a/airflow-core/src/airflow/utils/retries.py
+++ b/airflow-core/src/airflow/utils/retries.py
@@ -42,7 +42,7 @@ def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, 
logger: logging.Logge
         reraise=True,
         **kwargs,
     )
-    if logger and isinstance(logger, logging.Logger):
+    if logger is not None:
         retry_kwargs["before_sleep"] = tenacity.before_sleep_log(logger, 
logging.DEBUG, True)
 
     return tenacity.Retrying(**retry_kwargs)
diff --git a/airflow-core/tests/unit/always/test_providers_manager.py 
b/airflow-core/tests/unit/always/test_providers_manager.py
index feb6548e997..396c910f381 100644
--- a/airflow-core/tests/unit/always/test_providers_manager.py
+++ b/airflow-core/tests/unit/always/test_providers_manager.py
@@ -115,8 +115,8 @@ class TestProviderManager:
             )
             providers_manager._discover_hooks()
             _ = providers_manager._hooks_lazy_dict["wrong-connection-type"]
-        assert len(self._caplog.records) == 1
-        assert "Inconsistency!" in self._caplog.records[0].message
+        assert len(self._caplog.entries) == 1
+        assert "Inconsistency!" in self._caplog[0]["event"]
         assert "sftp" not in providers_manager.hooks
 
     def test_warning_logs_not_generated(self):
@@ -160,11 +160,12 @@ class TestProviderManager:
             providers_manager._discover_hooks()
             _ = providers_manager._hooks_lazy_dict["dummy"]
         assert len(self._caplog.records) == 1
-        assert "The connection type 'dummy' is already registered" in 
self._caplog.records[0].message
+        msg = self._caplog.messages[0]
+        assert msg.startswith("The connection type 'dummy' is already 
registered")
         assert (
             "different class names: 
'airflow.providers.dummy.hooks.dummy.DummyHook'"
             " and 'airflow.providers.dummy.hooks.dummy.DummyHook2'."
-        ) in self._caplog.records[0].message
+        ) in msg
 
     def test_providers_manager_register_plugins(self):
         providers_manager = ProvidersManager()
diff --git a/airflow-core/tests/unit/core/test_logging_config.py 
b/airflow-core/tests/unit/core/test_logging_config.py
deleted file mode 100644
index 29dae7ff712..00000000000
--- a/airflow-core/tests/unit/core/test_logging_config.py
+++ /dev/null
@@ -1,344 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import contextlib
-import importlib
-import logging
-import os
-import pathlib
-import sys
-import tempfile
-from unittest.mock import patch
-
-import pytest
-
-from airflow.configuration import conf
-
-from tests_common.test_utils.config import conf_vars
-
-SETTINGS_FILE_VALID = """
-LOGGING_CONFIG = {
-    'version': 1,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'airflow.task': {
-            'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} 
%%(levelname)s - %%(message)s'
-        },
-    },
-    'handlers': {
-        'console': {
-            'class': 'logging.StreamHandler',
-            'formatter': 'airflow.task',
-            'stream': 'ext://sys.stdout'
-        },
-        'task': {
-            'class': 'logging.StreamHandler',
-            'formatter': 'airflow.task',
-            'stream': 'ext://sys.stdout'
-        },
-    },
-    'loggers': {
-        'airflow.task': {
-            'handlers': ['task'],
-            'level': 'INFO',
-            'propagate': False,
-        },
-    }
-}
-"""
-
-SETTINGS_FILE_INVALID = """
-LOGGING_CONFIG = {
-    'version': 1,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'airflow.task': {
-            'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} 
%%(levelname)s - %%(message)s'
-        },
-    },
-    'handlers': {
-        'console': {
-            'class': 'logging.StreamHandler',
-            'formatter': 'airflow.task',
-            'stream': 'ext://sys.stdout'
-        }
-    },
-    'loggers': {
-        'airflow': {
-            'handlers': ['file.handler'], # this handler does not exists
-            'level': 'INFO',
-            'propagate': False
-        }
-    }
-}
-"""
-
-SETTINGS_FILE_EMPTY = """
-# Other settings here
-"""
-
-SETTINGS_DEFAULT_NAME = "custom_airflow_local_settings"
-
-
-def reset_logging():
-    """Reset Logging"""
-    manager = logging.root.manager
-    manager.disabled = logging.NOTSET
-    airflow_loggers = [
-        logger for logger_name, logger in manager.loggerDict.items() if 
logger_name.startswith("airflow")
-    ]
-    for logger in airflow_loggers:
-        if isinstance(logger, logging.Logger):
-            logger.setLevel(logging.NOTSET)
-            logger.propagate = True
-            logger.disabled = False
-            logger.filters.clear()
-            handlers = logger.handlers.copy()
-            for handler in handlers:
-                # Copied from `logging.shutdown`.
-                try:
-                    handler.acquire()
-                    handler.flush()
-                    handler.close()
-                except (OSError, ValueError):
-                    pass
-                finally:
-                    handler.release()
-                logger.removeHandler(handler)
-
-
[email protected]
-def settings_context(content, directory=None, name="LOGGING_CONFIG"):
-    """
-    Sets a settings file and puts it in the Python classpath
-
-    :param content:
-          The content of the settings file
-    :param directory: the directory
-    :param name: str
-    """
-    initial_logging_config = 
os.environ.get("AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS", "")
-    try:
-        settings_root = tempfile.mkdtemp()
-        filename = f"{SETTINGS_DEFAULT_NAME}.py"
-        if directory:
-            # Create the directory structure with __init__.py
-            dir_path = os.path.join(settings_root, directory)
-            pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True)
-
-            basedir = settings_root
-            for part in directory.split("/"):
-                open(os.path.join(basedir, "__init__.py"), "w").close()
-                basedir = os.path.join(basedir, part)
-            open(os.path.join(basedir, "__init__.py"), "w").close()
-
-            # Replace slashes by dots
-            module = directory.replace("/", ".") + "." + SETTINGS_DEFAULT_NAME 
+ "." + name
-            settings_file = os.path.join(dir_path, filename)
-        else:
-            module = SETTINGS_DEFAULT_NAME + "." + name
-            settings_file = os.path.join(settings_root, filename)
-
-        with open(settings_file, "w") as handle:
-            handle.writelines(content)
-        sys.path.append(settings_root)
-
-        # Using environment vars instead of conf_vars so value is accessible
-        # to parent and child processes when using 'spawn' for multiprocessing.
-        os.environ["AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS"] = module
-        yield settings_file
-
-    finally:
-        os.environ["AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS"] = 
initial_logging_config
-        sys.path.remove(settings_root)
-
-
-class TestLoggingSettings:
-    # Make sure that the configure_logging is not cached
-    def setup_method(self):
-        self.old_modules = dict(sys.modules)
-
-    def teardown_method(self):
-        # Remove any new modules imported during the test run. This lets us
-        # import the same source files for more than one test.
-        from airflow.config_templates import airflow_local_settings
-        from airflow.logging_config import configure_logging
-
-        for mod in list(sys.modules):
-            if mod not in self.old_modules:
-                del sys.modules[mod]
-
-        reset_logging()
-        importlib.reload(airflow_local_settings)
-        configure_logging()
-
-    # When we try to load an invalid config file, we expect an error
-    def test_loading_invalid_local_settings(self):
-        from airflow.logging_config import configure_logging, log
-
-        with settings_context(SETTINGS_FILE_INVALID):
-            with patch.object(log, "error") as mock_info:
-                # Load config
-                with pytest.raises(ValueError):
-                    configure_logging()
-
-                mock_info.assert_called_once_with(
-                    "Unable to load the config, contains a configuration 
error."
-                )
-
-    def test_loading_valid_complex_local_settings(self):
-        # Test what happens when the config is somewhere in a subfolder
-        module_structure = "etc.airflow.config"
-        dir_structure = module_structure.replace(".", "/")
-        with settings_context(SETTINGS_FILE_VALID, dir_structure):
-            from airflow.logging_config import configure_logging, log
-
-            with patch.object(log, "info") as mock_info:
-                configure_logging()
-                mock_info.assert_called_once_with(
-                    "Successfully imported user-defined logging config from 
%s",
-                    
f"etc.airflow.config.{SETTINGS_DEFAULT_NAME}.LOGGING_CONFIG",
-                )
-
-    # When we try to load a valid config
-    def test_loading_valid_local_settings(self):
-        with settings_context(SETTINGS_FILE_VALID):
-            from airflow.logging_config import configure_logging, log
-
-            with patch.object(log, "info") as mock_info:
-                configure_logging()
-                mock_info.assert_called_once_with(
-                    "Successfully imported user-defined logging config from 
%s",
-                    f"{SETTINGS_DEFAULT_NAME}.LOGGING_CONFIG",
-                )
-
-    # When we load an empty file, it should go to default
-    def test_loading_no_local_settings(self):
-        with settings_context(SETTINGS_FILE_EMPTY):
-            from airflow.logging_config import configure_logging
-
-            with pytest.raises(ImportError):
-                configure_logging()
-
-    # When the key is not available in the configuration
-    def test_when_the_config_key_does_not_exists(self):
-        from airflow import logging_config
-
-        with conf_vars({("logging", "logging_config_class"): None}):
-            with patch.object(logging_config.log, "debug") as mock_debug:
-                logging_config.configure_logging()
-                mock_debug.assert_any_call("Unable to load custom logging, 
using default config instead")
-
-    # Just default
-    def test_loading_local_settings_without_logging_config(self):
-        from airflow.logging_config import configure_logging, log
-
-        with patch.object(log, "debug") as mock_info:
-            configure_logging()
-            mock_info.assert_called_once_with("Unable to load custom logging, 
using default config instead")
-
-    def test_1_9_config(self):
-        from airflow.logging_config import configure_logging
-
-        with conf_vars({("logging", "task_log_reader"): "file.task"}):
-            with pytest.warns(DeprecationWarning, match=r"file.task"):
-                configure_logging()
-            assert conf.get("logging", "task_log_reader") == "task"
-
-    def test_loading_remote_logging_with_wasb_handler(self):
-        """Test if logging can be configured successfully for Azure Blob 
Storage"""
-        pytest.importorskip(
-            "airflow.providers.microsoft.azure", reason="'microsoft.azure' 
provider not installed"
-        )
-        from airflow.config_templates import airflow_local_settings
-        from airflow.logging_config import configure_logging
-        from airflow.providers.microsoft.azure.log.wasb_task_handler import 
WasbTaskHandler
-
-        with conf_vars(
-            {
-                ("logging", "remote_logging"): "True",
-                ("logging", "remote_log_conn_id"): "some_wasb",
-                ("logging", "remote_base_log_folder"): "wasb://some-folder",
-            }
-        ):
-            importlib.reload(airflow_local_settings)
-            configure_logging()
-
-        logger = logging.getLogger("airflow.task")
-        assert isinstance(logger.handlers[0], WasbTaskHandler)
-
-    @pytest.mark.parametrize(
-        "remote_base_log_folder, log_group_arn",
-        [
-            (
-                "cloudwatch://arn:aws:logs:aaaa:bbbbb:log-group:ccccc",
-                "arn:aws:logs:aaaa:bbbbb:log-group:ccccc",
-            ),
-            (
-                "cloudwatch://arn:aws:logs:aaaa:bbbbb:log-group:aws/ccccc",
-                "arn:aws:logs:aaaa:bbbbb:log-group:aws/ccccc",
-            ),
-            (
-                
"cloudwatch://arn:aws:logs:aaaa:bbbbb:log-group:/aws/ecs/ccccc",
-                "arn:aws:logs:aaaa:bbbbb:log-group:/aws/ecs/ccccc",
-            ),
-        ],
-    )
-    def test_log_group_arns_remote_logging_with_cloudwatch_handler(
-        self, remote_base_log_folder, log_group_arn
-    ):
-        """Test if the correct ARNs are configured for Cloudwatch"""
-        from airflow.config_templates import airflow_local_settings
-        from airflow.logging_config import configure_logging
-
-        with conf_vars(
-            {
-                ("logging", "remote_logging"): "True",
-                ("logging", "remote_log_conn_id"): "some_cloudwatch",
-                ("logging", "remote_base_log_folder"): remote_base_log_folder,
-            }
-        ):
-            importlib.reload(airflow_local_settings)
-            configure_logging()
-            assert (
-                
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]["log_group_arn"]
-                == log_group_arn
-            )
-
-    def test_loading_remote_logging_with_kwargs(self):
-        """Test if logging can be configured successfully with kwargs"""
-        pytest.importorskip("airflow.providers.amazon", reason="'amazon' 
provider not installed")
-        from airflow.config_templates import airflow_local_settings
-        from airflow.logging_config import configure_logging
-        from airflow.providers.amazon.aws.log.s3_task_handler import 
S3TaskHandler
-
-        with conf_vars(
-            {
-                ("logging", "remote_logging"): "True",
-                ("logging", "remote_log_conn_id"): "some_s3",
-                ("logging", "remote_base_log_folder"): "s3://some-folder",
-                ("logging", "remote_task_handler_kwargs"): 
'{"delete_local_copy": true}',
-            }
-        ):
-            importlib.reload(airflow_local_settings)
-            configure_logging()
-
-        logger = logging.getLogger("airflow.task")
-        assert isinstance(logger.handlers[0], S3TaskHandler)
-        assert getattr(logger.handlers[0], "delete_local_copy") is True
diff --git a/airflow-core/tests/unit/core/test_stats.py 
b/airflow-core/tests/unit/core/test_stats.py
index a297492dbf8..cff52656091 100644
--- a/airflow-core/tests/unit/core/test_stats.py
+++ b/airflow-core/tests/unit/core/test_stats.py
@@ -375,11 +375,11 @@ class TestPatternValidatorConfigOption:
 
     @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): 
"baz,qux"})
     def test_setting_allow_and_block_logs_warning(self, caplog):
-        importlib.reload(airflow.stats)
-
-        assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
-        assert type(airflow.stats.Stats.instance.metrics_validator) is 
PatternAllowListValidator
         with caplog.at_level(logging.WARNING):
+            importlib.reload(airflow.stats)
+
+            assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
+            assert type(airflow.stats.Stats.instance.metrics_validator) is 
PatternAllowListValidator
             assert "Ignoring metrics_block_list" in caplog.text
 
 
diff --git a/airflow-core/tests/unit/sensors/test_external_task_sensor.py 
b/airflow-core/tests/unit/sensors/test_external_task_sensor.py
index b8f9b477aa8..947c125f8be 100644
--- a/airflow-core/tests/unit/sensors/test_external_task_sensor.py
+++ b/airflow-core/tests/unit/sensors/test_external_task_sensor.py
@@ -393,8 +393,7 @@ class TestExternalTaskSensor:
             caplog.clear()
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
             assert (
-                f"Poking for tasks ['{TEST_TASK_ID}'] "
-                f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+                f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on 
{DEFAULT_DATE.isoformat()} ... "
             ) in caplog.messages
 
     def test_external_task_sensor_external_task_ids_param(self, caplog):
@@ -412,8 +411,7 @@ class TestExternalTaskSensor:
             caplog.clear()
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
             assert (
-                f"Poking for tasks ['{TEST_TASK_ID}'] "
-                f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+                f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on 
{DEFAULT_DATE.isoformat()} ... "
             ) in caplog.messages
 
     def 
test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, 
caplog):
@@ -1431,13 +1429,13 @@ def dag_bag_cyclic():
                 dags.append(dag)
                 task_a = ExternalTaskSensor(
                     task_id=f"task_a_{n}",
-                    external_dag_id=f"dag_{n-1}",
-                    external_task_id=f"task_b_{n-1}",
+                    external_dag_id=f"dag_{n - 1}",
+                    external_task_id=f"task_b_{n - 1}",
                 )
                 task_b = ExternalTaskMarker(
                     task_id=f"task_b_{n}",
-                    external_dag_id=f"dag_{n+1}",
-                    external_task_id=f"task_a_{n+1}",
+                    external_dag_id=f"dag_{n + 1}",
+                    external_task_id=f"task_a_{n + 1}",
                     recursion_depth=3,
                 )
                 task_a >> task_b
@@ -1447,8 +1445,8 @@ def dag_bag_cyclic():
             dags.append(dag)
             task_a = ExternalTaskSensor(
                 task_id=f"task_a_{depth}",
-                external_dag_id=f"dag_{depth-1}",
-                external_task_id=f"task_b_{depth-1}",
+                external_dag_id=f"dag_{depth - 1}",
+                external_task_id=f"task_b_{depth - 1}",
             )
             task_b = ExternalTaskMarker(
                 task_id=f"task_b_{depth}",
diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py 
b/airflow-core/tests/unit/utils/log/test_log_reader.py
index 150425eb5d1..e4e7bb0ea15 100644
--- a/airflow-core/tests/unit/utils/log/test_log_reader.py
+++ b/airflow-core/tests/unit/utils/log/test_log_reader.py
@@ -18,10 +18,10 @@ from __future__ import annotations
 
 import copy
 import datetime
-import logging
 import os
 import sys
 import tempfile
+import types
 from typing import TYPE_CHECKING
 from unittest import mock
 
@@ -78,16 +78,26 @@ class TestLogView:
 
     @pytest.fixture(autouse=True)
     def configure_loggers(self, log_dir, settings_folder):
-        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
-        logging_config["handlers"]["task"]["base_log_folder"] = log_dir
-        settings_file = os.path.join(settings_folder, 
"airflow_local_settings_test.py")
-        with open(settings_file, "w") as handle:
-            new_logging_file = f"LOGGING_CONFIG = {logging_config}"
-            handle.writelines(new_logging_file)
+        logging_config = {**DEFAULT_LOGGING_CONFIG}
+        logging_config["handlers"] = {**logging_config["handlers"]}
+        logging_config["handlers"]["task"] = {
+            **logging_config["handlers"]["task"],
+            "base_log_folder": log_dir,
+        }
+
+        mod = types.SimpleNamespace()
+        mod.LOGGING_CONFIG = logging_config
+
+        # "Inject" a fake module into sys so it loads it without needing to 
write valid python code
+        sys.modules["airflow_local_settings_test"] = mod
+
         with conf_vars({("logging", "logging_config_class"): 
"airflow_local_settings_test.LOGGING_CONFIG"}):
             settings.configure_logging()
-        yield
-        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        try:
+            yield
+        finally:
+            del sys.modules["airflow_local_settings_test"]
+            settings.configure_logging()
 
     @pytest.fixture(autouse=True)
     def prepare_log_files(self, log_dir):
diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py 
b/airflow-core/tests/unit/utils/test_log_handlers.py
index f94c83fe92b..ba32e250f25 100644
--- a/airflow-core/tests/unit/utils/test_log_handlers.py
+++ b/airflow-core/tests/unit/utils/test_log_handlers.py
@@ -36,6 +36,7 @@ from pydantic import TypeAdapter
 from pydantic.v1.utils import deep_update
 from requests.adapters import Response
 
+from airflow import settings
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.executors import executor_constants, executor_loader
 from airflow.jobs.job import Job
@@ -61,7 +62,7 @@ from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
 
-pytestmark = pytest.mark.db_test
+pytestmark = [pytest.mark.db_test, pytest.mark.xfail()]
 
 DEFAULT_DATE = datetime(2016, 1, 1)
 TASK_LOGGER = "airflow.task"
@@ -88,8 +89,7 @@ class TestFileTaskLogHandler:
             session.query(TaskInstance).delete()
 
     def setup_method(self):
-        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
-        logging.root.disabled = False
+        settings.configure_logging()
         self.clean_up()
         # We use file task handler by default.
 
diff --git a/airflow-core/tests/unit/utils/test_logging_mixin.py 
b/airflow-core/tests/unit/utils/test_logging_mixin.py
index 14e3532074b..4ad65db7fb1 100644
--- a/airflow-core/tests/unit/utils/test_logging_mixin.py
+++ b/airflow-core/tests/unit/utils/test_logging_mixin.py
@@ -93,18 +93,6 @@ class TestLoggingMixin:
 
         assert DummyClass().log.name == 
"unit.utils.test_logging_mixin.DummyClass"
 
-    def test_logger_name_is_root_when_logger_name_is_empty_string(self):
-        """
-        Ensure that when `_logger_name` is set as an empty string, the 
resulting logger name is an empty
-        string too, which result in a logger with 'root' as name.
-        Note: Passing an empty string to `logging.getLogger` will create a 
logger with name 'root'.
-        """
-
-        class EmptyStringLogger(LoggingMixin):
-            _logger_name: str | None = ""
-
-        assert EmptyStringLogger().log.name == "root"
-
     def test_log_config_logger_name_correctly_prefix_logger_name(self):
         """
         Ensure that when a class has `_log_config_logger_name`, it is used as 
prefix in the final logger
diff --git a/airflow-core/tests/unit/utils/test_process_utils.py 
b/airflow-core/tests/unit/utils/test_process_utils.py
index 7edc5f28b73..47ffc9245f3 100644
--- a/airflow-core/tests/unit/utils/test_process_utils.py
+++ b/airflow-core/tests/unit/utils/test_process_utils.py
@@ -103,19 +103,22 @@ class TestReapProcessGroup:
 class TestExecuteInSubProcess:
     def test_should_print_all_messages1(self, caplog):
         execute_in_subprocess(["bash", "-c", "echo CAT; echo KITTY;"])
-        msgs = [record.getMessage() for record in caplog.records]
-        assert msgs == ["Executing cmd: bash -c 'echo CAT; echo KITTY;'", 
"Output:", "CAT", "KITTY"]
+        assert caplog.messages == [
+            "Executing cmd: bash -c 'echo CAT; echo KITTY;'",
+            "Output:",
+            "CAT",
+            "KITTY",
+        ]
 
     def test_should_print_all_messages_from_cwd(self, caplog, tmp_path):
         execute_in_subprocess(["bash", "-c", "echo CAT; pwd; echo KITTY;"], 
cwd=str(tmp_path))
-        msgs = [record.getMessage() for record in caplog.records]
         assert [
             "Executing cmd: bash -c 'echo CAT; pwd; echo KITTY;'",
             "Output:",
             "CAT",
             str(tmp_path),
             "KITTY",
-        ] == msgs
+        ] == caplog.messages
 
     def test_using_env_works(self, caplog):
         execute_in_subprocess(["bash", "-c", 'echo "My value is ${VALUE}"'], 
env=dict(VALUE="1"))
diff --git 
a/airflow-core/tests/unit/utils/test_task_handler_with_custom_formatter.py 
b/airflow-core/tests/unit/utils/test_task_handler_with_custom_formatter.py
deleted file mode 100644
index 118e442d0c1..00000000000
--- a/airflow-core/tests/unit/utils/test_task_handler_with_custom_formatter.py
+++ /dev/null
@@ -1,115 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import logging
-
-import pytest
-
-from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
-from airflow.models.taskinstance import TaskInstance
-from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils.log.logging_mixin import set_context
-from airflow.utils.state import DagRunState
-from airflow.utils.timezone import datetime
-from airflow.utils.types import DagRunType
-
-from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.types import DagRunTriggeredByType
-
-pytestmark = pytest.mark.db_test
-
-
-DEFAULT_DATE = datetime(2019, 1, 1)
-TASK_HANDLER = "task"
-TASK_HANDLER_CLASS = 
"airflow.utils.log.task_handler_with_custom_formatter.TaskHandlerWithCustomFormatter"
-PREV_TASK_HANDLER = DEFAULT_LOGGING_CONFIG["handlers"]["task"]
-
-DAG_ID = "task_handler_with_custom_formatter_dag"
-TASK_ID = "task_handler_with_custom_formatter_task"
-
-
[email protected](scope="module", autouse=True)
-def custom_task_log_handler_config():
-    DEFAULT_LOGGING_CONFIG["handlers"]["task"] = {
-        "class": TASK_HANDLER_CLASS,
-        "formatter": "airflow",
-        "stream": "sys.stdout",
-    }
-    logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
-    logging.root.disabled = False
-    yield
-    DEFAULT_LOGGING_CONFIG["handlers"]["task"] = PREV_TASK_HANDLER
-    logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
-
-
[email protected]
-def task_instance(dag_maker):
-    with dag_maker(DAG_ID, start_date=DEFAULT_DATE, serialized=True) as dag:
-        task = EmptyOperator(task_id=TASK_ID)
-    triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
-    dagrun = dag_maker.create_dagrun(
-        state=DagRunState.RUNNING,
-        logical_date=DEFAULT_DATE,
-        run_type=DagRunType.MANUAL,
-        
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
-        **triggered_by_kwargs,
-    )
-    ti = TaskInstance(task=task, run_id=dagrun.run_id)
-    ti.log.disabled = False
-    yield ti
-    clear_db_runs()
-
-
-def assert_prefix_once(task_instance: TaskInstance, prefix: str) -> None:
-    handler = next((h for h in task_instance.log.handlers if h.name == 
TASK_HANDLER), None)
-    assert handler is not None, "custom task log handler not set up correctly"
-    assert handler.formatter is not None, "custom task log formatter not set 
up correctly"
-    previous_formatter = handler.formatter
-    expected_format = f"{prefix}:{handler.formatter._fmt}"
-    set_context(task_instance.log, task_instance)
-    assert expected_format == handler.formatter._fmt
-    handler.setFormatter(previous_formatter)
-
-
-def assert_prefix_multiple(task_instance: TaskInstance, prefix: str) -> None:
-    handler = next((h for h in task_instance.log.handlers if h.name == 
TASK_HANDLER), None)
-    assert handler is not None, "custom task log handler not set up correctly"
-    assert handler.formatter is not None, "custom task log formatter not set 
up correctly"
-    previous_formatter = handler.formatter
-    expected_format = f"{prefix}:{handler.formatter._fmt}"
-    set_context(task_instance.log, task_instance)
-    set_context(task_instance.log, task_instance)
-    set_context(task_instance.log, task_instance)
-    assert expected_format == handler.formatter._fmt
-    handler.setFormatter(previous_formatter)
-
-
-def test_custom_formatter_default_format(task_instance):
-    """The default format provides no prefix."""
-    assert_prefix_once(task_instance, "")
-
-
-@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ 
ti.task_id }}"})
-def test_custom_formatter_custom_format_not_affected_by_config(task_instance):
-    """Certifies that the prefix is only added once, even after repeated 
calls"""
-    assert_prefix_multiple(task_instance, f"{DAG_ID}-{TASK_ID}")
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index ad885d176c7..3a1ac156170 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import json
+import logging
 import os
 import platform
 import re
@@ -1767,7 +1768,7 @@ def add_expected_folders_to_pythonpath():
 
 
 @pytest.fixture
-def cap_structlog():
+def cap_structlog(request, monkeypatch):
     """
     Test that structlog messages are logged.
 
@@ -1782,59 +1783,46 @@ def cap_structlog():
     ...
     ...     assert "not logged" not in cap_structlog  # not in works too
     """
-    import structlog.testing
-    from structlog import configure, get_config
-
-    class LogCapture(structlog.testing.LogCapture):
-        def __contains__(self, target):
-            import operator
-
-            if isinstance(target, str):
-
-                def predicate(e):
-                    return e["event"] == target
-            elif isinstance(target, dict):
-                # Partial comparison -- only check keys passed in
-                get = operator.itemgetter(*target.keys())
-                want = tuple(target.values())
-
-                def predicate(e):
-                    try:
-                        return get(e) == want
-                    except Exception:
-                        return False
-            else:
-                raise TypeError(f"Can't search logs using {type(target)}")
-
-            return any(predicate(e) for e in self.entries)
-
-        def __getitem__(self, i):
-            return self.entries[i]
+    import structlog.stdlib
+    from structlog import DropEvent, configure, get_config
 
-        def __iter__(self):
-            return iter(self.entries)
+    from tests_common.test_utils.logs import StructlogCapture
 
-        def __repr__(self):
-            return repr(self.entries)
-
-        @property
-        def text(self):
-            """All the event text as a single multi-line string."""
-            return "\n".join(e["event"] for e in self.entries)
-
-    cap = LogCapture()
+    cap = StructlogCapture()
     # Modify `_Configuration.default_processors` set via `configure` but always
     # keep the list instance intact to not break references held by bound
     # loggers.
     processors = get_config()["processors"]
     old_processors = processors.copy()
+
+    # And modify the stdlib logging to capture too
+    handler = logging.root.handlers[0]
+    if not isinstance(handler.formatter, structlog.stdlib.ProcessorFormatter):
+        raise AssertionError(
+            f"{type(handler.formatter)} is not an instance of 
structlog.stblid.ProcessorFormatter"
+        )
+
+    std_formatter = structlog.stdlib.ProcessorFormatter(
+        foreign_pre_chain=handler.formatter.foreign_pre_chain,
+        pass_foreign_args=True,
+        processor=cap,
+    )
+
+    def stdlib_filter(record):
+        with suppress(DropEvent):
+            std_formatter.format(record)
+        return False
+
     try:
         # clear processors list and use LogCapture for testing
         processors.clear()
         processors.append(cap)
         configure(processors=processors)
+        monkeypatch.setattr(handler, "level", logging.DEBUG)
+        monkeypatch.setattr(handler, "filters", [stdlib_filter])
         yield cap
     finally:
+        cap._finalize()
         # remove LogCapture and restore original processors
         processors.clear()
         processors.extend(old_processors)
@@ -1849,6 +1837,8 @@ def override_caplog(request):
     This is in an effort to reduce flakiness from caplog related tests where 
one test file can change log
     behaviour and bleed in to affecting other test files
     """
+    yield request.getfixturevalue("cap_structlog")
+    return
     # We need this `_ispytest` so it doesn't warn about using private
     fixture = pytest.LogCaptureFixture(request.node, _ispytest=True)
     yield fixture
diff --git a/devel-common/src/tests_common/test_utils/logs.py 
b/devel-common/src/tests_common/test_utils/logs.py
index 03cbe0b55b9..511d8f87b5d 100644
--- a/devel-common/src/tests_common/test_utils/logs.py
+++ b/devel-common/src/tests_common/test_utils/logs.py
@@ -18,10 +18,16 @@
 from __future__ import annotations
 
 import json
+import logging
+from contextlib import contextmanager
+from typing import TYPE_CHECKING, NoReturn
 
 from airflow.models import Log
 from airflow.sdk.execution_time.secrets_masker import DEFAULT_SENSITIVE_FIELDS
 
+if TYPE_CHECKING:
+    from structlog.typing import EventDict, WrappedLogger
+
 
 def _masked_value_check(data, sensitive_fields):
     """
@@ -69,3 +75,188 @@ def check_last_log(session, dag_id, event, logical_date, 
expected_extra=None, ch
         _masked_value_check(extra_json, DEFAULT_SENSITIVE_FIELDS)
 
     session.query(Log).delete()
+
+
+class StructlogCapture:
+    """
+    Test that structlog messages are logged.
+
+    This extends the feature built in to structlog to make it easier to find 
if a message is logged.
+
+    >>> def test_something(cap_structlog):
+    ...     log.info("some event", field1=False, field2=[1, 2])
+    ...     log.info("some event", field1=True)
+    ...     assert "some_event" in cap_structlog  # a string searches on 
`event` field
+    ...     assert {"event": "some_event", "field1": True} in cap_structlog  # 
Searches only on passed fields
+    ...     assert {"field2": [1, 2]} in cap_structlog
+    ...
+    ...     assert "not logged" not in cap_structlog  # not in works too
+
+    This fixture class will also manage the log level of stdlib loggers via 
``at_level`` and ``set_level``.
+    """
+
+    # This class is a manual mixing of pytest's LogCaptureFixture and 
structlog's LogCapture class, but
+    # tailored to Airflow's "send all logs via structlog" approach
+
+    _logger: str | None = None
+    """The logger we specifically want to capture log messages from"""
+
+    def __init__(self):
+        self.entries = []
+        self._initial_logger_levels: dict[str | None, int] = {}
+
+    def _finalize(self) -> None:
+        """
+        Finalize the fixture.
+
+        This restores the log levels and the disabled logging levels changed 
by :meth:`set_level`.
+        """
+        from airflow._logging.structlog import PER_LOGGER_LEVELS
+
+        for logger_name, level in self._initial_logger_levels.items():
+            logger = logging.getLogger(logger_name)
+            logger.setLevel(level)
+            if level is logging.NOTSET:
+                del PER_LOGGER_LEVELS[logger_name]
+            else:
+                PER_LOGGER_LEVELS[logger_name] = level
+
+    def __contains__(self, target):
+        import operator
+
+        if isinstance(target, str):
+
+            def predicate(e):
+                return e["event"] == target
+        elif isinstance(target, dict):
+            # Partial comparison -- only check keys passed in
+            get = operator.itemgetter(*target.keys())
+            want = tuple(target.values())
+
+            def predicate(e):
+                try:
+                    return get(e) == want
+                except Exception:
+                    return False
+        else:
+            raise TypeError(f"Can't search logs using {type(target)}")
+
+        return any(predicate(e) for e in self.entries)
+
+    def __getitem__(self, i):
+        return self.entries[i]
+
+    def __iter__(self):
+        return iter(self.entries)
+
+    def __repr__(self):
+        return repr(self.entries)
+
+    def __call__(self, logger: WrappedLogger, method_name: str, event_dict: 
EventDict) -> NoReturn:
+        from structlog import DropEvent
+        from structlog._log_levels import map_method_name
+
+        from airflow._logging.structlog import (
+            NamedBytesLogger,
+            NamedWriteLogger,
+        )
+
+        logger_name = (
+            event_dict.get("logger_name")
+            or event_dict.get("logger")
+            or (isinstance(logger, (NamedBytesLogger, NamedWriteLogger)) and 
logger.name)
+            or ""
+        )
+        if not self._logger or logger_name.startswith(self._logger):
+            event_dict["log_level"] = map_method_name(method_name)
+            self.entries.append(event_dict)
+
+        raise DropEvent
+
+    @property
+    def text(self):
+        """All the event text as a single multi-line string."""
+        return "\n".join(e["event"] for e in self.entries)
+
+    # These next fns make it duck-type the same as Pytests "caplog" fixture
+    @property
+    def messages(self):
+        """All the event messages as a list."""
+        return [e["event"] for e in self.entries]
+
+    def _force_enable_logging(self, level: int, logger_obj: logging.Logger) -> 
int:
+        """
+        Enable the desired logging level if the global level was disabled via 
``logging.disabled``.
+
+        Only enables logging levels greater than or equal to the requested 
``level``.
+
+        Does nothing if the desired ``level`` wasn't disabled.
+
+        :param level:
+            The logger level caplog should capture.
+            All logging is enabled if a non-standard logging level string is 
supplied.
+            Valid level strings are in :data:`logging._nameToLevel`.
+        :param logger_obj: The logger object to check.
+
+        :return: The original disabled logging level.
+        """
+        original_disable_level: int = logger_obj.manager.disable
+
+        if not logger_obj.isEnabledFor(level):
+            # Each level is `10` away from other levels.
+            # https://docs.python.org/3/library/logging.html#logging-levels
+            disable_level = max(level - 10, logging.NOTSET)
+            logging.disable(disable_level)
+
+        return original_disable_level
+
+    @contextmanager
+    def at_level(self, level: str | int, logger=None):
+        from airflow._logging.structlog import NAME_TO_LEVEL, PER_LOGGER_LEVELS
+
+        if isinstance(level, str):
+            level = NAME_TO_LEVEL[level.lower()]
+
+        key = logger or ""
+        old = PER_LOGGER_LEVELS.get(key, logging.NOTSET)
+        PER_LOGGER_LEVELS[key] = level
+        stdlogger = logging.getLogger(key)
+        stdlogger.setLevel(level)
+        hdlr = orig_hdlr_level = None
+        if stdlogger.handlers:
+            hdlr = stdlogger.handlers[0]
+            orig_hdlr_level = hdlr.level
+            hdlr.setLevel(level)
+        try:
+            yield self
+        finally:
+            stdlogger.setLevel(old)
+            if hdlr is not None:
+                hdlr.setLevel(orig_hdlr_level)
+            if old is logging.NOTSET:
+                del PER_LOGGER_LEVELS[key]
+            else:
+                PER_LOGGER_LEVELS[key] = old
+
+    def set_level(self, level: str | int, logger=None):
+        from airflow._logging.structlog import NAME_TO_LEVEL, PER_LOGGER_LEVELS
+
+        # Set the global level
+        if isinstance(level, str):
+            level = NAME_TO_LEVEL[level.lower()]
+
+        key = logger or ""
+
+        stdlogger = logging.getLogger(key)
+        self._initial_logger_levels[key] = PER_LOGGER_LEVELS.get(key, 
logging.NOTSET)
+
+        PER_LOGGER_LEVELS[key] = level
+        stdlogger.setLevel(level)
+        self._logger = logger
+
+    def clear(self):
+        self.entries = []
+
+    @property
+    def records(self):
+        return self.entries

Reply via email to