This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch structlog-in-logging-mixin in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0b37eff717583c4d5446bc7c966f1d5150d8bcb0 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Thu Mar 6 15:06:53 2025 +0000 WIP: switch all airflow logging to structlog --- airflow/_logging/__init__.py | 0 airflow/_logging/structlog.py | 434 ++++++++++++++++++++++++++++++++++++++++++ hatch_build.py | 2 + task_sdk/pyproject.toml | 2 +- 4 files changed, 437 insertions(+), 1 deletion(-) diff --git a/airflow/_logging/__init__.py b/airflow/_logging/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/airflow/_logging/structlog.py b/airflow/_logging/structlog.py new file mode 100644 index 00000000000..c9251639736 --- /dev/null +++ b/airflow/_logging/structlog.py @@ -0,0 +1,434 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import itertools +import logging +import os +import sys +import warnings +from functools import cache +from typing import Any, BinaryIO, Callable, Generic, Protocol, TextIO, TypeVar, Union + +import pygtrie +import structlog +from structlog.processors import NAME_TO_LEVEL +from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, Processor, WrappedLogger + +log = logging.getLogger(__name__) + + +class AirflowFilteringBoundLogger(FilteringBoundLogger, Protocol): + def isEnabledFor(self, level: int): ... + def getEffectiveLevel(self) -> int: ... + + +LEVEL_TO_FILTERING_LOGGER: dict[int, type[AirflowFilteringBoundLogger]] = {} + + +def _make_airflow_structlogger(min_level): + # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126 + # as inspiration + + LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()} + + def isEnabledFor(self: Any, level): + return self.is_enabled_for(level) + + def getEffectiveLevel(self: Any): + return self.get_effective_level() + + base = structlog.make_filtering_bound_logger(min_level) + + cls = type( + f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 'Notset').capitalize()}", + (base,), + {"isEnabledFor": isEnabledFor, "getEffectiveLevel": getEffectiveLevel}, + ) + LEVEL_TO_FILTERING_LOGGER[min_level] = cls + return cls + + +AirflowBoundLoggerFilteringAtNotset = _make_airflow_structlogger(NAME_TO_LEVEL["notset"]) +AirflowBoundLoggerFilteringAtDebug = _make_airflow_structlogger(NAME_TO_LEVEL["debug"]) +AirflowBoundLoggerFilteringAtInfo = _make_airflow_structlogger(NAME_TO_LEVEL["info"]) +AirflowBoundLoggerFilteringAtWarning = _make_airflow_structlogger(NAME_TO_LEVEL["warning"]) +AirflowBoundLoggerFilteringAtError = _make_airflow_structlogger(NAME_TO_LEVEL["error"]) +AirflowBoundLoggerFilteringAtCritical = _make_airflow_structlogger(NAME_TO_LEVEL["critical"]) + +# We use a trie structure so that we can easily and quickly find the most suitable log level to apply +PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".") +PER_LOGGER_LEVELS.update( + { + "airflow": NAME_TO_LEVEL["info"], + "airflow.foobar": NAME_TO_LEVEL["debug"], + "baz": NAME_TO_LEVEL["warning"], + } +) + + +def make_filtering_logger(default_min_level: int | str) -> Callable[..., AirflowFilteringBoundLogger]: + if isinstance(default_min_level, str): + default_min_level = NAME_TO_LEVEL[default_min_level.lower()] + + def maker(logger: WrappedLogger, *args, **kwargs): + # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then + # look up the global per-logger config and redirect to a new class. + + if isinstance(logger, (NamedWriteLogger, NamedBytesLogger)): + logger_name = logger.name + level = PER_LOGGER_LEVELS.longest_prefix(logger_name).get(default_min_level) + else: + level = default_min_level + return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs) + + return maker + + +class NamedBytesLogger(structlog.BytesLogger): + __slots__ = ("name",) + + def __init__(self, name: str | None = None, file: BinaryIO | None = None): + self.name = name + super().__init__(file) + + +class NamedWriteLogger(structlog.WriteLogger): + __slots__ = ("name",) + + def __init__(self, name: str | None = None, file: TextIO | None = None): + self.name = name + super().__init__(file) + + +LogOutputType = TypeVar("LogOutputType", bound=Union[TextIO, BinaryIO]) + + +class LoggerFactory(Generic[LogOutputType]): + def __init__(self, cls: Callable[[str, LogOutputType | None]], io: LogOutputType | None = None): + self.cls = cls + self.io = io + + def __call__(self, logger_name: str | None = None, *args: Any) -> LogOutputType: + return self.cls(logger_name, self.io) + + +def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, "name", None)): + event_dict.setdefault("logger", logger_name) + return event_dict + + +def exception_group_tracebacks( + format_exception: Callable[[ExcInfo], list[dict[str, Any]]], +) -> Processor: + # Make mypy happy + if not hasattr(__builtins__, "BaseExceptionGroup"): + T = TypeVar("T") + + class BaseExceptionGroup(Generic[T]): + exceptions: list[T] + + def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if exc_info := event_dict.get("exc_info", None): + group: BaseExceptionGroup[Exception] | None = None + if exc_info is True: + # `log.exception('mesg")` case + exc_info = sys.exc_info() + if exc_info[0] is None: + exc_info = None + + if ( + isinstance(exc_info, tuple) + and len(exc_info) == 3 + and isinstance(exc_info[1], BaseExceptionGroup) + ): + group = exc_info[1] + elif isinstance(exc_info, BaseExceptionGroup): + group = exc_info + + if group: + # Only remove it from event_dict if we handle it + del event_dict["exc_info"] + event_dict["exception"] = list( + itertools.chain.from_iterable( + format_exception((type(exc), exc, exc.__traceback__)) # type: ignore[attr-defined,arg-type] + for exc in (*group.exceptions, group) + ) + ) + + return event_dict + + return _exception_group_tracebacks + + +# `eyJ` is `{"}` in base64 encoding -- i.e. and any value that starts like that is in high likely hood a JWT +# token. Better safe than sorry +def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + for k, v in event_dict.items(): + if isinstance(v, str) and v.startswith("eyJ"): + event_dict[k] = "eyJ***" + return event_dict + + +def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + event_dict.pop("positional_args", None) + return event_dict + + +@cache +def structlog_processors( + json_output: bool, +): + if json_output: + timestamper = structlog.processors.MaybeTimeStamper(fmt="iso") + else: + timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f") + + processors: list[structlog.typing.Processor] = [ + timestamper, + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + logger_name, + redact_jwt, + structlog.processors.StackInfoRenderer(), + ] + + # Imports to suppress showing code from these modules. We need the import to get the filepath for + # structlog to ignore. + import contextlib + + import click + import httpcore + import httpx + + suppress = ( + click, + contextlib, + httpx, + httpcore, + httpx, + ) + + if json_output: + dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer( + use_rich=False, show_locals=False, suppress=suppress + ) + + dict_tracebacks = structlog.processors.ExceptionRenderer(dict_exc_formatter) + if hasattr(__builtins__, "BaseExceptionGroup"): + exc_group_processor = exception_group_tracebacks(dict_exc_formatter) + processors.append(exc_group_processor) + else: + exc_group_processor = None + + import msgspec + + def json_dumps(msg, default): + # Note: this is likely an "expensive" step, but lets massage the dict order for nice + # viewing of the raw JSON logs. + # Maybe we don't need this once the UI renders the JSON instead of displaying the raw text + msg = { + "timestamp": msg.pop("timestamp"), + "level": msg.pop("level"), + "event": msg.pop("event"), + **msg, + } + return msgspec.json.encode(msg, enc_hook=default) + + def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: + # Stdlib logging doesn't need the re-ordering, it's fine as it is + return msgspec.json.encode(event_dict).decode("utf-8") + + json = structlog.processors.JSONRenderer(serializer=json_dumps) + + processors.extend( + ( + dict_tracebacks, + structlog.processors.UnicodeDecoder(), + json, + ), + ) + + return processors, { + "timestamper": timestamper, + "exc_group_processor": exc_group_processor, + "dict_tracebacks": dict_tracebacks, + "json": json_processor, + } + else: + rich_exc_formatter = structlog.dev.RichTracebackFormatter( + # These values are picked somewhat arbitrarily to produce useful-but-compact tracebacks. If + # we ever need to change these then they should be configurable. + extra_lines=0, + max_frames=30, + indent_guides=False, + suppress=suppress, + ) + my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles() + my_styles["debug"] = structlog.dev.CYAN + + console = structlog.dev.ConsoleRenderer( + exception_formatter=rich_exc_formatter, level_styles=my_styles + ) + processors.append(console) + return processors, { + "timestamper": timestamper, + "console": console, + } + + +@cache +def configure_structlog( + json_output: bool = False, + log_level: str = "DEBUG", + cache_logger_on_first_use: bool = True, +): + """Set up struct logging and stdlib logging config.""" + lvl = NAME_TO_LEVEL[log_level.lower()] + + if json_output: + formatter = "plain" + else: + formatter = "colored" + processors, named = structlog_processors(json_output) + timestamper = named["timestamper"] + + pre_chain: list[structlog.typing.Processor] = [ + # Add the log level and a timestamp to the event_dict if the log entry + # is not from structlog. + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + timestamper, + structlog.contextvars.merge_contextvars, + redact_jwt, + ] + + # Don't cache the loggers during tests, it make it hard to capture them + if "PYTEST_CURRENT_TEST" in os.environ: + cache_logger_on_first_use = False + + color_formatter: list[structlog.typing.Processor] = [ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + drop_positional_args, + ] + std_lib_formatter: list[structlog.typing.Processor] = [ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + drop_positional_args, + ] + + wrapper_class = make_filtering_logger(lvl) + if json_output: + structlog.configure( + processors=processors, + cache_logger_on_first_use=cache_logger_on_first_use, + wrapper_class=wrapper_class, + logger_factory=LoggerFactory(NamedBytesLogger), + ) + + if processor := named["exc_group_processor"]: + pre_chain.append(processor) + pre_chain.append(named["dict_tracebacks"]) + color_formatter.append(named["json"]) + std_lib_formatter.append(named["json"]) + else: + structlog.configure( + processors=processors, + cache_logger_on_first_use=cache_logger_on_first_use, + wrapper_class=wrapper_class, + logger_factory=LoggerFactory(NamedWriteLogger), + ) + color_formatter.append(named["console"]) + + global _warnings_showwarning + + if _warnings_showwarning is None: + _warnings_showwarning = warnings.showwarning + # Capture warnings and show them via structlog + warnings.showwarning = _showwarning + + import logging.config + + logging.config.dictConfig( + { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "plain": { + "()": structlog.stdlib.ProcessorFormatter, + "processors": std_lib_formatter, + "foreign_pre_chain": pre_chain, + "pass_foreign_args": True, + }, + "colored": { + "()": structlog.stdlib.ProcessorFormatter, + "processors": color_formatter, + "foreign_pre_chain": pre_chain, + "pass_foreign_args": True, + }, + }, + "handlers": { + "default": { + "level": log_level.upper(), + "class": "logging.StreamHandler", + "formatter": formatter, + }, + }, + "loggers": { + # Set Airflow logging to the level requested, but most everything else at "INFO" + "": { + "handlers": ["default"], + "level": "INFO", + "propagate": True, + }, + "airflow": {"level": log_level.upper()}, + # These ones are too chatty even at info + "httpx": {"level": "WARN"}, + "sqlalchemy.engine": {"level": "WARN"}, + }, + } + ) + + +_warnings_showwarning: Any = None + + +def _showwarning( + message: Warning | str, + category: type[Warning], + filename: str, + lineno: int, + file: TextIO | None = None, + line: str | None = None, +) -> Any: + """ + Redirects warnings to structlog so they appear in task logs etc. + + Implementation of showwarnings which redirects to logging, which will first + check to see if the file parameter is None. If a file is specified, it will + delegate to the original warnings implementation of showwarning. Otherwise, + it will call warnings.formatwarning and will log the resulting string to a + warnings logger named "py.warnings" with level logging.WARNING. + """ + if file is not None: + if _warnings_showwarning is not None: + _warnings_showwarning(message, category, filename, lineno, file, line) + else: + log = structlog.get_logger(logger_name="py.warnings") + log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno) diff --git a/hatch_build.py b/hatch_build.py index 6b8294d1d75..3c757ae794f 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -255,6 +255,7 @@ DEPENDENCIES = [ # Pygments 2.19.0 improperly renders .ini files with dictionaries as values # See https://github.com/pygments/pygments/issues/2834 "pygments>=2.0.1,!=2.19.0", + "pygtrie>=2.5.0,<3.0", "pyjwt>=2.0.0", "python-daemon>=3.0.0", "python-dateutil>=2.7.0", @@ -274,6 +275,7 @@ DEPENDENCIES = [ "sqlalchemy>=1.4.49,<2.0", "sqlalchemy-jsonfield>=1.0", "sqlalchemy-utils>=0.41.2", + "structlog>=25.1.0", "tabulate>=0.7.5", "tenacity>=8.0.0,!=8.2.0", "termcolor>=2.5.0", diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml index d980c0aae82..624a64a3258 100644 --- a/task_sdk/pyproject.toml +++ b/task_sdk/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ 'pendulum>=3.0.0,<4.0;python_version>="3.12"', "python-dateutil>=2.7.0", "psutil>=6.1.0", - "structlog>=24.4.0", + "structlog>=25.1.0", "retryhttp>=1.2.0,!=1.3.0", ] classifiers = [
