This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch structlog-in-logging-mixin
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0b37eff717583c4d5446bc7c966f1d5150d8bcb0
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Mar 6 15:06:53 2025 +0000

    WIP: switch all airflow logging to structlog
---
 airflow/_logging/__init__.py  |   0
 airflow/_logging/structlog.py | 434 ++++++++++++++++++++++++++++++++++++++++++
 hatch_build.py                |   2 +
 task_sdk/pyproject.toml       |   2 +-
 4 files changed, 437 insertions(+), 1 deletion(-)

diff --git a/airflow/_logging/__init__.py b/airflow/_logging/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/airflow/_logging/structlog.py b/airflow/_logging/structlog.py
new file mode 100644
index 00000000000..c9251639736
--- /dev/null
+++ b/airflow/_logging/structlog.py
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import itertools
+import logging
+import os
+import sys
+import warnings
+from functools import cache
+from typing import Any, BinaryIO, Callable, Generic, Protocol, TextIO, 
TypeVar, Union
+
+import pygtrie
+import structlog
+from structlog.processors import NAME_TO_LEVEL
+from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, 
Processor, WrappedLogger
+
+log = logging.getLogger(__name__)
+
+
+class AirflowFilteringBoundLogger(FilteringBoundLogger, Protocol):
+    def isEnabledFor(self, level: int): ...
+    def getEffectiveLevel(self) -> int: ...
+
+
+LEVEL_TO_FILTERING_LOGGER: dict[int, type[AirflowFilteringBoundLogger]] = {}
+
+
+def _make_airflow_structlogger(min_level):
+    # This uses 
https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126
+    # as inspiration
+
+    LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()}
+
+    def isEnabledFor(self: Any, level):
+        return self.is_enabled_for(level)
+
+    def getEffectiveLevel(self: Any):
+        return self.get_effective_level()
+
+    base = structlog.make_filtering_bound_logger(min_level)
+
+    cls = type(
+        f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 
'Notset').capitalize()}",
+        (base,),
+        {"isEnabledFor": isEnabledFor, "getEffectiveLevel": getEffectiveLevel},
+    )
+    LEVEL_TO_FILTERING_LOGGER[min_level] = cls
+    return cls
+
+
+AirflowBoundLoggerFilteringAtNotset = 
_make_airflow_structlogger(NAME_TO_LEVEL["notset"])
+AirflowBoundLoggerFilteringAtDebug = 
_make_airflow_structlogger(NAME_TO_LEVEL["debug"])
+AirflowBoundLoggerFilteringAtInfo = 
_make_airflow_structlogger(NAME_TO_LEVEL["info"])
+AirflowBoundLoggerFilteringAtWarning = 
_make_airflow_structlogger(NAME_TO_LEVEL["warning"])
+AirflowBoundLoggerFilteringAtError = 
_make_airflow_structlogger(NAME_TO_LEVEL["error"])
+AirflowBoundLoggerFilteringAtCritical = 
_make_airflow_structlogger(NAME_TO_LEVEL["critical"])
+
+# We use a trie structure so that we can easily and quickly find the most 
suitable log level to apply
+PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".")
+PER_LOGGER_LEVELS.update(
+    {
+        "airflow": NAME_TO_LEVEL["info"],
+        "airflow.foobar": NAME_TO_LEVEL["debug"],
+        "baz": NAME_TO_LEVEL["warning"],
+    }
+)
+
+
+def make_filtering_logger(default_min_level: int | str) -> Callable[..., 
AirflowFilteringBoundLogger]:
+    if isinstance(default_min_level, str):
+        default_min_level = NAME_TO_LEVEL[default_min_level.lower()]
+
+    def maker(logger: WrappedLogger, *args, **kwargs):
+        # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow 
specific subclass) then
+        # look up the global per-logger config and redirect to a new class.
+
+        if isinstance(logger, (NamedWriteLogger, NamedBytesLogger)):
+            logger_name = logger.name
+            level = 
PER_LOGGER_LEVELS.longest_prefix(logger_name).get(default_min_level)
+        else:
+            level = default_min_level
+        return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs)
+
+    return maker
+
+
+class NamedBytesLogger(structlog.BytesLogger):
+    __slots__ = ("name",)
+
+    def __init__(self, name: str | None = None, file: BinaryIO | None = None):
+        self.name = name
+        super().__init__(file)
+
+
+class NamedWriteLogger(structlog.WriteLogger):
+    __slots__ = ("name",)
+
+    def __init__(self, name: str | None = None, file: TextIO | None = None):
+        self.name = name
+        super().__init__(file)
+
+
+LogOutputType = TypeVar("LogOutputType", bound=Union[TextIO, BinaryIO])
+
+
+class LoggerFactory(Generic[LogOutputType]):
+    def __init__(self, cls: Callable[[str, LogOutputType | None]], io: 
LogOutputType | None = None):
+        self.cls = cls
+        self.io = io
+
+    def __call__(self, logger_name: str | None = None, *args: Any) -> 
LogOutputType:
+        return self.cls(logger_name, self.io)
+
+
+def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> 
EventDict:
+    if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, 
"name", None)):
+        event_dict.setdefault("logger", logger_name)
+    return event_dict
+
+
+def exception_group_tracebacks(
+    format_exception: Callable[[ExcInfo], list[dict[str, Any]]],
+) -> Processor:
+    # Make mypy happy
+    if not hasattr(__builtins__, "BaseExceptionGroup"):
+        T = TypeVar("T")
+
+        class BaseExceptionGroup(Generic[T]):
+            exceptions: list[T]
+
+    def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: 
EventDict) -> EventDict:
+        if exc_info := event_dict.get("exc_info", None):
+            group: BaseExceptionGroup[Exception] | None = None
+            if exc_info is True:
+                # `log.exception('mesg")` case
+                exc_info = sys.exc_info()
+                if exc_info[0] is None:
+                    exc_info = None
+
+            if (
+                isinstance(exc_info, tuple)
+                and len(exc_info) == 3
+                and isinstance(exc_info[1], BaseExceptionGroup)
+            ):
+                group = exc_info[1]
+            elif isinstance(exc_info, BaseExceptionGroup):
+                group = exc_info
+
+            if group:
+                # Only remove it from event_dict if we handle it
+                del event_dict["exc_info"]
+                event_dict["exception"] = list(
+                    itertools.chain.from_iterable(
+                        format_exception((type(exc), exc, exc.__traceback__))  
# type: ignore[attr-defined,arg-type]
+                        for exc in (*group.exceptions, group)
+                    )
+                )
+
+        return event_dict
+
+    return _exception_group_tracebacks
+
+
+# `eyJ` is `{"}` in base64 encoding -- i.e. and any value that starts like 
that is in high likely hood a JWT
+# token. Better safe than sorry
+def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> 
EventDict:
+    for k, v in event_dict.items():
+        if isinstance(v, str) and v.startswith("eyJ"):
+            event_dict[k] = "eyJ***"
+    return event_dict
+
+
+def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) 
-> EventDict:
+    event_dict.pop("positional_args", None)
+    return event_dict
+
+
+@cache
+def structlog_processors(
+    json_output: bool,
+):
+    if json_output:
+        timestamper = structlog.processors.MaybeTimeStamper(fmt="iso")
+    else:
+        timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d 
%H:%M:%S.%f")
+
+    processors: list[structlog.typing.Processor] = [
+        timestamper,
+        structlog.contextvars.merge_contextvars,
+        structlog.processors.add_log_level,
+        structlog.stdlib.PositionalArgumentsFormatter(),
+        logger_name,
+        redact_jwt,
+        structlog.processors.StackInfoRenderer(),
+    ]
+
+    # Imports to suppress showing code from these modules. We need the import 
to get the filepath for
+    # structlog to ignore.
+    import contextlib
+
+    import click
+    import httpcore
+    import httpx
+
+    suppress = (
+        click,
+        contextlib,
+        httpx,
+        httpcore,
+        httpx,
+    )
+
+    if json_output:
+        dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer(
+            use_rich=False, show_locals=False, suppress=suppress
+        )
+
+        dict_tracebacks = 
structlog.processors.ExceptionRenderer(dict_exc_formatter)
+        if hasattr(__builtins__, "BaseExceptionGroup"):
+            exc_group_processor = 
exception_group_tracebacks(dict_exc_formatter)
+            processors.append(exc_group_processor)
+        else:
+            exc_group_processor = None
+
+        import msgspec
+
+        def json_dumps(msg, default):
+            # Note: this is likely an "expensive" step, but lets massage the 
dict order for nice
+            # viewing of the raw JSON logs.
+            # Maybe we don't need this once the UI renders the JSON instead of 
displaying the raw text
+            msg = {
+                "timestamp": msg.pop("timestamp"),
+                "level": msg.pop("level"),
+                "event": msg.pop("event"),
+                **msg,
+            }
+            return msgspec.json.encode(msg, enc_hook=default)
+
+        def json_processor(logger: Any, method_name: Any, event_dict: 
EventDict) -> str:
+            # Stdlib logging doesn't need the re-ordering, it's fine as it is
+            return msgspec.json.encode(event_dict).decode("utf-8")
+
+        json = structlog.processors.JSONRenderer(serializer=json_dumps)
+
+        processors.extend(
+            (
+                dict_tracebacks,
+                structlog.processors.UnicodeDecoder(),
+                json,
+            ),
+        )
+
+        return processors, {
+            "timestamper": timestamper,
+            "exc_group_processor": exc_group_processor,
+            "dict_tracebacks": dict_tracebacks,
+            "json": json_processor,
+        }
+    else:
+        rich_exc_formatter = structlog.dev.RichTracebackFormatter(
+            # These values are picked somewhat arbitrarily to produce 
useful-but-compact tracebacks. If
+            # we ever need to change these then they should be configurable.
+            extra_lines=0,
+            max_frames=30,
+            indent_guides=False,
+            suppress=suppress,
+        )
+        my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles()
+        my_styles["debug"] = structlog.dev.CYAN
+
+        console = structlog.dev.ConsoleRenderer(
+            exception_formatter=rich_exc_formatter, level_styles=my_styles
+        )
+        processors.append(console)
+        return processors, {
+            "timestamper": timestamper,
+            "console": console,
+        }
+
+
+@cache
+def configure_structlog(
+    json_output: bool = False,
+    log_level: str = "DEBUG",
+    cache_logger_on_first_use: bool = True,
+):
+    """Set up struct logging and stdlib logging config."""
+    lvl = NAME_TO_LEVEL[log_level.lower()]
+
+    if json_output:
+        formatter = "plain"
+    else:
+        formatter = "colored"
+    processors, named = structlog_processors(json_output)
+    timestamper = named["timestamper"]
+
+    pre_chain: list[structlog.typing.Processor] = [
+        # Add the log level and a timestamp to the event_dict if the log entry
+        # is not from structlog.
+        structlog.stdlib.add_log_level,
+        structlog.stdlib.add_logger_name,
+        timestamper,
+        structlog.contextvars.merge_contextvars,
+        redact_jwt,
+    ]
+
+    # Don't cache the loggers during tests, it make it hard to capture them
+    if "PYTEST_CURRENT_TEST" in os.environ:
+        cache_logger_on_first_use = False
+
+    color_formatter: list[structlog.typing.Processor] = [
+        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+        drop_positional_args,
+    ]
+    std_lib_formatter: list[structlog.typing.Processor] = [
+        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+        drop_positional_args,
+    ]
+
+    wrapper_class = make_filtering_logger(lvl)
+    if json_output:
+        structlog.configure(
+            processors=processors,
+            cache_logger_on_first_use=cache_logger_on_first_use,
+            wrapper_class=wrapper_class,
+            logger_factory=LoggerFactory(NamedBytesLogger),
+        )
+
+        if processor := named["exc_group_processor"]:
+            pre_chain.append(processor)
+        pre_chain.append(named["dict_tracebacks"])
+        color_formatter.append(named["json"])
+        std_lib_formatter.append(named["json"])
+    else:
+        structlog.configure(
+            processors=processors,
+            cache_logger_on_first_use=cache_logger_on_first_use,
+            wrapper_class=wrapper_class,
+            logger_factory=LoggerFactory(NamedWriteLogger),
+        )
+        color_formatter.append(named["console"])
+
+    global _warnings_showwarning
+
+    if _warnings_showwarning is None:
+        _warnings_showwarning = warnings.showwarning
+        # Capture warnings and show them via structlog
+        warnings.showwarning = _showwarning
+
+    import logging.config
+
+    logging.config.dictConfig(
+        {
+            "version": 1,
+            "disable_existing_loggers": False,
+            "formatters": {
+                "plain": {
+                    "()": structlog.stdlib.ProcessorFormatter,
+                    "processors": std_lib_formatter,
+                    "foreign_pre_chain": pre_chain,
+                    "pass_foreign_args": True,
+                },
+                "colored": {
+                    "()": structlog.stdlib.ProcessorFormatter,
+                    "processors": color_formatter,
+                    "foreign_pre_chain": pre_chain,
+                    "pass_foreign_args": True,
+                },
+            },
+            "handlers": {
+                "default": {
+                    "level": log_level.upper(),
+                    "class": "logging.StreamHandler",
+                    "formatter": formatter,
+                },
+            },
+            "loggers": {
+                # Set Airflow logging to the level requested, but most 
everything else at "INFO"
+                "": {
+                    "handlers": ["default"],
+                    "level": "INFO",
+                    "propagate": True,
+                },
+                "airflow": {"level": log_level.upper()},
+                # These ones are too chatty even at info
+                "httpx": {"level": "WARN"},
+                "sqlalchemy.engine": {"level": "WARN"},
+            },
+        }
+    )
+
+
+_warnings_showwarning: Any = None
+
+
+def _showwarning(
+    message: Warning | str,
+    category: type[Warning],
+    filename: str,
+    lineno: int,
+    file: TextIO | None = None,
+    line: str | None = None,
+) -> Any:
+    """
+    Redirects warnings to structlog so they appear in task logs etc.
+
+    Implementation of showwarnings which redirects to logging, which will first
+    check to see if the file parameter is None. If a file is specified, it will
+    delegate to the original warnings implementation of showwarning. Otherwise,
+    it will call warnings.formatwarning and will log the resulting string to a
+    warnings logger named "py.warnings" with level logging.WARNING.
+    """
+    if file is not None:
+        if _warnings_showwarning is not None:
+            _warnings_showwarning(message, category, filename, lineno, file, 
line)
+    else:
+        log = structlog.get_logger(logger_name="py.warnings")
+        log.warning(str(message), category=category.__name__, 
filename=filename, lineno=lineno)
diff --git a/hatch_build.py b/hatch_build.py
index 6b8294d1d75..3c757ae794f 100644
--- a/hatch_build.py
+++ b/hatch_build.py
@@ -255,6 +255,7 @@ DEPENDENCIES = [
     # Pygments 2.19.0 improperly renders .ini files with dictionaries as values
     # See https://github.com/pygments/pygments/issues/2834
     "pygments>=2.0.1,!=2.19.0",
+    "pygtrie>=2.5.0,<3.0",
     "pyjwt>=2.0.0",
     "python-daemon>=3.0.0",
     "python-dateutil>=2.7.0",
@@ -274,6 +275,7 @@ DEPENDENCIES = [
     "sqlalchemy>=1.4.49,<2.0",
     "sqlalchemy-jsonfield>=1.0",
     "sqlalchemy-utils>=0.41.2",
+    "structlog>=25.1.0",
     "tabulate>=0.7.5",
     "tenacity>=8.0.0,!=8.2.0",
     "termcolor>=2.5.0",
diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml
index d980c0aae82..624a64a3258 100644
--- a/task_sdk/pyproject.toml
+++ b/task_sdk/pyproject.toml
@@ -32,7 +32,7 @@ dependencies = [
     'pendulum>=3.0.0,<4.0;python_version>="3.12"',
     "python-dateutil>=2.7.0",
     "psutil>=6.1.0",
-    "structlog>=24.4.0",
+    "structlog>=25.1.0",
     "retryhttp>=1.2.0,!=1.3.0",
 ]
 classifiers = [

Reply via email to