This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch structlog-in-logging-mixin in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 54c87c691810dfa6297f93d2d792887e9a8e349d Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Thu Mar 6 15:06:53 2025 +0000 WIP: switch all airflow logging to structlog --- airflow-core/pyproject.toml | 2 + airflow-core/src/airflow/_logging/__init__.py | 16 + airflow-core/src/airflow/_logging/structlog.py | 472 +++++++++++++++++++++ .../src/airflow/jobs/triggerer_job_runner.py | 5 +- airflow-core/src/airflow/logging_config.py | 9 +- .../airflow/utils/log/file_processor_handler.py | 4 +- .../src/airflow/utils/log/logging_mixin.py | 19 +- airflow-core/src/airflow/utils/retries.py | 2 +- .../tests/unit/always/test_providers_manager.py | 9 +- .../tests/unit/core/test_logging_config.py | 344 --------------- airflow-core/tests/unit/core/test_stats.py | 8 +- .../unit/sensors/test_external_task_sensor.py | 18 +- .../tests/unit/utils/log/test_log_reader.py | 28 +- airflow-core/tests/unit/utils/test_log_handlers.py | 6 +- .../tests/unit/utils/test_logging_mixin.py | 12 - .../tests/unit/utils/test_process_utils.py | 11 +- .../test_task_handler_with_custom_formatter.py | 115 ----- devel-common/src/tests_common/pytest_plugin.py | 70 ++- devel-common/src/tests_common/test_utils/logs.py | 191 +++++++++ 19 files changed, 775 insertions(+), 566 deletions(-) diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index d3e46ce4106..d245204123c 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -109,6 +109,7 @@ dependencies = [ # See https://github.com/pygments/pygments/issues/2834 "pygments>=2.0.1,!=2.19.0", "pyjwt>=2.10.0", + "pygtrie>=2.5.0", "python-daemon>=3.0.0", "python-dateutil>=2.7.0", "python-slugify>=5.0", @@ -124,6 +125,7 @@ dependencies = [ "sqlalchemy>=1.4.49,<2.0", "sqlalchemy-jsonfield>=1.0", "sqlalchemy-utils>=0.41.2", + "structlog>=25.1.0", "svcs>=25.1.0", "tabulate>=0.7.5", "tenacity>=8.0.0,!=8.2.0", diff --git a/airflow-core/src/airflow/_logging/__init__.py b/airflow-core/src/airflow/_logging/__init__.py new file mode 100644 index 00000000000..13a83393a91 --- /dev/null +++ b/airflow-core/src/airflow/_logging/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/src/airflow/_logging/structlog.py b/airflow-core/src/airflow/_logging/structlog.py new file mode 100644 index 00000000000..be64336e80f --- /dev/null +++ b/airflow-core/src/airflow/_logging/structlog.py @@ -0,0 +1,472 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import itertools +import logging +import os +import sys +import warnings +from functools import cache +from typing import Any, BinaryIO, Callable, Generic, Protocol, TextIO, TypeVar, Union, cast + +import pygtrie +import structlog +from structlog.processors import NAME_TO_LEVEL +from structlog.typing import ( + BindableLogger, + EventDict, + ExcInfo, + FilteringBoundLogger, + Processor, + WrappedLogger, +) + +log = logging.getLogger(__name__) + + +class AirflowFilteringBoundLogger(FilteringBoundLogger, Protocol): + def isEnabledFor(self, level: int): ... + def getEffectiveLevel(self) -> int: ... + + name: str + + +LEVEL_TO_FILTERING_LOGGER: dict[int, type[AirflowFilteringBoundLogger]] = {} + + +def _make_airflow_structlogger(min_level): + # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126 + # as inspiration + + LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()} + + def isEnabledFor(self: Any, level): + return self.is_enabled_for(level) + + def getEffectiveLevel(self: Any): + return self.get_effective_level() + + @property + def name(self): + return self._logger.name + + base = structlog.make_filtering_bound_logger(min_level) + + cls = type( + f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 'Notset').capitalize()}", + (base,), + { + "isEnabledFor": isEnabledFor, + "getEffectiveLevel": getEffectiveLevel, + "name": name, + }, + ) + LEVEL_TO_FILTERING_LOGGER[min_level] = cls + return cls + + +AirflowBoundLoggerFilteringAtNotset = _make_airflow_structlogger(NAME_TO_LEVEL["notset"]) +AirflowBoundLoggerFilteringAtDebug = _make_airflow_structlogger(NAME_TO_LEVEL["debug"]) +AirflowBoundLoggerFilteringAtInfo = _make_airflow_structlogger(NAME_TO_LEVEL["info"]) +AirflowBoundLoggerFilteringAtWarning = _make_airflow_structlogger(NAME_TO_LEVEL["warning"]) +AirflowBoundLoggerFilteringAtError = _make_airflow_structlogger(NAME_TO_LEVEL["error"]) +AirflowBoundLoggerFilteringAtCritical = _make_airflow_structlogger(NAME_TO_LEVEL["critical"]) + +# We use a trie structure so that we can easily and quickly find the most suitable log level to apply. +# This mirrors the logging level cascade behavior from stdlib logging, without the complexity of multiple +# handlers etc +PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".") +PER_LOGGER_LEVELS.update( + { + # Top level logging default - changed to respect config in `configure_structlog` + "": NAME_TO_LEVEL["info"], + } +) + + +def make_filtering_logger() -> Callable[..., BindableLogger]: + def maker(logger: WrappedLogger, *args, **kwargs): + # If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then + # look up the global per-logger config and redirect to a new class. + + logger_name = kwargs.get("context", {}).get("logger_name") + if not logger_name and isinstance(logger, (NamedWriteLogger, NamedBytesLogger)): + logger_name = logger.name + + if logger_name: + level = PER_LOGGER_LEVELS.longest_prefix(logger_name).get(PER_LOGGER_LEVELS[""]) + else: + level = PER_LOGGER_LEVELS[""] + return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs) # type: ignore[call-arg] + + return maker + + +class NamedBytesLogger(structlog.BytesLogger): + __slots__ = ("name",) + + def __init__(self, name: str | None = None, file: BinaryIO | None = None): + self.name = name + super().__init__(file) + + +class NamedWriteLogger(structlog.WriteLogger): + __slots__ = ("name",) + + def __init__(self, name: str | None = None, file: TextIO | None = None): + self.name = name + super().__init__(file) + + +LogOutputType = TypeVar("LogOutputType", bound=Union[TextIO, BinaryIO]) + + +class LoggerFactory(Generic[LogOutputType]): + def __init__( + self, + cls: Callable[[str | None, LogOutputType | None], WrappedLogger], + io: LogOutputType | None = None, + ): + self.cls = cls + self.io = io + + def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger: + return self.cls(logger_name, self.io) + + +def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, "name", None)): + event_dict.setdefault("logger", logger_name) + return event_dict + + +def exception_group_tracebacks( + format_exception: Callable[[ExcInfo], list[dict[str, Any]]], +) -> Processor: + # Make mypy happy + if not hasattr(__builtins__, "BaseExceptionGroup"): + T = TypeVar("T") + + class BaseExceptionGroup(Generic[T]): + exceptions: list[T] + + def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + if exc_info := event_dict.get("exc_info", None): + group: BaseExceptionGroup[Exception] | None = None + if exc_info is True: + # `log.exception('mesg")` case + exc_info = sys.exc_info() + if exc_info[0] is None: + exc_info = None + + if ( + isinstance(exc_info, tuple) + and len(exc_info) == 3 + and isinstance(exc_info[1], BaseExceptionGroup) + ): + group = exc_info[1] + elif isinstance(exc_info, BaseExceptionGroup): + group = exc_info + + if group: + # Only remove it from event_dict if we handle it + del event_dict["exc_info"] + event_dict["exception"] = list( + itertools.chain.from_iterable( + format_exception((type(exc), exc, exc.__traceback__)) # type: ignore[attr-defined,arg-type] + for exc in (*group.exceptions, group) + ) + ) + + return event_dict + + return _exception_group_tracebacks + + +# `eyJ` is `{"` in base64 encoding -- and any value that starts like that is in high likely hood a JWT +# token. Better safe than sorry +def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + for k, v in event_dict.items(): + if isinstance(v, str) and v.startswith("eyJ"): + event_dict[k] = "eyJ***" + return event_dict + + +def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: + event_dict.pop("positional_args", None) + return event_dict + + +@cache +def structlog_processors( + json_output: bool, +): + if json_output: + timestamper = structlog.processors.MaybeTimeStamper(fmt="iso") + else: + timestamper = structlog.processors.MaybeTimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f") + + processors: list[structlog.typing.Processor] = [ + timestamper, + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + logger_name, + redact_jwt, + structlog.processors.StackInfoRenderer(), + ] + + # Imports to suppress showing code from these modules. We need the import to get the filepath for + # structlog to ignore. + import contextlib + + import click + import httpcore + import httpx + + suppress = ( + click, + contextlib, + httpx, + httpcore, + httpx, + ) + + if json_output: + dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer( + use_rich=False, show_locals=False, suppress=suppress + ) + + dict_tracebacks = structlog.processors.ExceptionRenderer(dict_exc_formatter) + if hasattr(__builtins__, "BaseExceptionGroup"): + exc_group_processor = exception_group_tracebacks(dict_exc_formatter) + processors.append(exc_group_processor) + else: + exc_group_processor = None + + import msgspec + + def json_dumps(msg, default): + # Note: this is likely an "expensive" step, but lets massage the dict order for nice + # viewing of the raw JSON logs. + # Maybe we don't need this once the UI renders the JSON instead of displaying the raw text + msg = { + "timestamp": msg.pop("timestamp"), + "level": msg.pop("level"), + "event": msg.pop("event"), + **msg, + } + return msgspec.json.encode(msg, enc_hook=default) + + def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str: + # Stdlib logging doesn't need the re-ordering, it's fine as it is + return msgspec.json.encode(event_dict).decode("utf-8") + + json = structlog.processors.JSONRenderer(serializer=json_dumps) + + processors.extend( + ( + dict_tracebacks, + structlog.processors.UnicodeDecoder(), + json, + ), + ) + + return processors, { + "timestamper": timestamper, + "exc_group_processor": exc_group_processor, + "dict_tracebacks": dict_tracebacks, + "json": json_processor, + } + else: + rich_exc_formatter = structlog.dev.RichTracebackFormatter( + # These values are picked somewhat arbitrarily to produce useful-but-compact tracebacks. If + # we ever need to change these then they should be configurable. + extra_lines=0, + max_frames=30, + indent_guides=False, + suppress=suppress, + ) + my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles() + my_styles["debug"] = structlog.dev.CYAN + + console = structlog.dev.ConsoleRenderer( + exception_formatter=rich_exc_formatter, level_styles=my_styles + ) + processors.append(console) + return processors, { + "timestamper": timestamper, + "console": console, + } + + +def configure_structlog( + json_output: bool = False, + log_level: str = "DEBUG", + cache_logger_on_first_use: bool = True, + stdlib_config: dict | None = None, +): + if "fatal" not in NAME_TO_LEVEL: + NAME_TO_LEVEL["fatal"] = NAME_TO_LEVEL["critical"] + + stdlib_config = stdlib_config or {} + + """Set up struct logging and stdlib logging config.""" + PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()] + + if json_output: + formatter = "plain" + else: + formatter = "colored" + processors, named = structlog_processors(json_output) + timestamper = named["timestamper"] + + pre_chain: list[structlog.typing.Processor] = [ + # Add the log level and a timestamp to the event_dict if the log entry + # is not from structlog. + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + timestamper, + structlog.contextvars.merge_contextvars, + redact_jwt, + ] + + # Don't cache the loggers during tests, it make it hard to capture them + if "PYTEST_VERSION" in os.environ: + cache_logger_on_first_use = False + + color_formatter: list[structlog.typing.Processor] = [ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + drop_positional_args, + ] + std_lib_formatter: list[structlog.typing.Processor] = [ + structlog.stdlib.ProcessorFormatter.remove_processors_meta, + drop_positional_args, + ] + + wrapper_class = cast(type[BindableLogger], make_filtering_logger()) + if json_output: + structlog.configure( + processors=processors, + cache_logger_on_first_use=cache_logger_on_first_use, + wrapper_class=wrapper_class, + logger_factory=LoggerFactory(NamedBytesLogger), # type: ignore[type-var] + ) + + if processor := named["exc_group_processor"]: + pre_chain.append(processor) + pre_chain.append(named["dict_tracebacks"]) + color_formatter.append(named["json"]) + std_lib_formatter.append(named["json"]) + else: + structlog.configure( + processors=processors, + cache_logger_on_first_use=cache_logger_on_first_use, + wrapper_class=wrapper_class, + logger_factory=LoggerFactory(NamedWriteLogger), # type: ignore[type-var] + ) + color_formatter.append(named["console"]) + + global _warnings_showwarning + + if _warnings_showwarning is None: + _warnings_showwarning = warnings.showwarning + # Capture warnings and show them via structlog + warnings.showwarning = _showwarning + + import logging.config + + config = {**stdlib_config} + config["formatters"] = {**config["formatters"]} + config["handlers"] = {**config["handlers"]} + config["loggers"] = {**config["loggers"]} + config["formatters"].update( + { + "plain": { + "()": structlog.stdlib.ProcessorFormatter, + "processors": std_lib_formatter, + "foreign_pre_chain": pre_chain, + "pass_foreign_args": True, + }, + "colored": { + "()": structlog.stdlib.ProcessorFormatter, + "processors": color_formatter, + "foreign_pre_chain": pre_chain, + "pass_foreign_args": True, + }, + } + ) + config["handlers"].update( + { + "default": { + "level": log_level.upper(), + "class": "logging.StreamHandler", + "formatter": formatter, + }, + } + ) + for log_config in config["loggers"].values(): + # We want everything to go via structlog, remove whatever the user might have configured + log_config.pop("stream", None) + log_config.pop("formatter", None) + # log_config.pop("handlers", None) + config["loggers"].update( + { + # Set Airflow logging to the level requested, but most everything else at "INFO" + "airflow": {"level": log_level.upper()}, + # These ones are too chatty even at info + "httpx": {"level": "WARN"}, + "sqlalchemy.engine": {"level": "WARN"}, + } + ) + config["root"] = { + "handlers": ["default"], + "level": "INFO", + "propagate": True, + } + + logging.config.dictConfig(config) + + +_warnings_showwarning: Any = None + + +def _showwarning( + message: Warning | str, + category: type[Warning], + filename: str, + lineno: int, + file: TextIO | None = None, + line: str | None = None, +) -> Any: + """ + Redirects warnings to structlog so they appear in task logs etc. + + Implementation of showwarnings which redirects to logging, which will first + check to see if the file parameter is None. If a file is specified, it will + delegate to the original warnings implementation of showwarning. Otherwise, + it will call warnings.formatwarning and will log the resulting string to a + warnings logger named "py.warnings" with level logging.WARNING. + """ + if file is not None: + if _warnings_showwarning is not None: + _warnings_showwarning(message, category, filename, lineno, file, line) + else: + log = structlog.get_logger("py.warnings") + log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index bcf0bb9bb96..3d3a462dfef 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -547,7 +547,7 @@ class TriggerDetails(TypedDict): events: int -class TriggerRunner: +class TriggerRunner(LoggingMixin): """ Runtime environment for all triggers. @@ -579,9 +579,6 @@ class TriggerRunner: # TODO: set this in a sig-int handler stop: bool = False - # TODO: connect this to the parent process - log: FilteringBoundLogger = structlog.get_logger() - requests_sock: asyncio.StreamWriter def __init__(self): diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index f0497c57409..0873b8b29bf 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -19,8 +19,8 @@ from __future__ import annotations import logging import warnings -from logging.config import dictConfig +from airflow._logging.structlog import configure_structlog from airflow.configuration import conf from airflow.exceptions import AirflowConfigException from airflow.utils.module_loading import import_string @@ -65,20 +65,21 @@ def configure_logging(): if "mask_secrets" not in task_handler_config["filters"]: task_handler_config["filters"].append("mask_secrets") + level: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() # Try to init logging - dictConfig(logging_config) + configure_structlog(log_level=level, stdlib_config=logging_config) except (ValueError, KeyError) as e: log.error("Unable to load the config, contains a configuration error.") # When there is an error in the config, escalate the exception # otherwise Airflow would silently fall back on the default config raise e - validate_logging_config(logging_config) + validate_logging_config() return logging_class_path -def validate_logging_config(logging_config): +def validate_logging_config(): """Validate the provided Logging Config.""" # Now lets validate the other logging-related settings task_log_reader = conf.get("logging", "task_log_reader") diff --git a/airflow-core/src/airflow/utils/log/file_processor_handler.py b/airflow-core/src/airflow/utils/log/file_processor_handler.py index 5dbd034cea8..cf0cec6ed83 100644 --- a/airflow-core/src/airflow/utils/log/file_processor_handler.py +++ b/airflow-core/src/airflow/utils/log/file_processor_handler.py @@ -25,7 +25,7 @@ from pathlib import Path from airflow import settings from airflow.utils import timezone from airflow.utils.helpers import parse_template_string -from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE +from airflow.utils.log.logging_mixin import SetContextPropagate from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler logger = logging.getLogger(__name__) @@ -69,7 +69,7 @@ class FileProcessorHandler(logging.Handler): self._symlink_latest_log_directory() self._cur_date = datetime.today() - return DISABLE_PROPOGATE + return SetContextPropagate.DISABLE_PROPAGATE def emit(self, record): if self.handler is not None: diff --git a/airflow-core/src/airflow/utils/log/logging_mixin.py b/airflow-core/src/airflow/utils/log/logging_mixin.py index d06ba0b7d34..785503ae13e 100644 --- a/airflow-core/src/airflow/utils/log/logging_mixin.py +++ b/airflow-core/src/airflow/utils/log/logging_mixin.py @@ -19,15 +19,16 @@ from __future__ import annotations import abc import enum -import logging import re import sys from io import TextIOBase, UnsupportedOperation from logging import Handler, StreamHandler from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast +import structlog + if TYPE_CHECKING: - from logging import Logger + from airflow._logging.structlog import AirflowFilteringBoundLogger # 7-bit C1 ANSI escape sequences ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") @@ -67,7 +68,7 @@ _T = TypeVar("_T") class LoggingMixin: """Convenience super-class to have a logger configured with the class name.""" - _log: logging.Logger | None = None + _log: AirflowFilteringBoundLogger | None = None # Parent logger used by this class. It should match one of the loggers defined in the # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and @@ -104,29 +105,27 @@ class LoggingMixin: return logger_name @classmethod - def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger: + def _get_log(cls, obj: Any, clazz: type[_T]) -> AirflowFilteringBoundLogger: if obj._log is None: logger_name: str = cls._create_logger_name( logged_class=clazz, log_config_logger_name=obj._log_config_logger_name, class_logger_name=obj._logger_name, ) - obj._log = logging.getLogger(logger_name) + obj._log = structlog.get_logger(logger_name) return obj._log @classmethod - def logger(cls) -> Logger: + def logger(cls) -> AirflowFilteringBoundLogger: """Return a logger.""" return LoggingMixin._get_log(cls, cls) @property - def log(self) -> Logger: + def log(self) -> AirflowFilteringBoundLogger: """Return a logger.""" return LoggingMixin._get_log(self, self.__class__) - def _set_context(self, context): - if context is not None: - set_context(self.log, context) + def _set_context(self, context): ... class ExternalLoggingMixin: diff --git a/airflow-core/src/airflow/utils/retries.py b/airflow-core/src/airflow/utils/retries.py index 809d176ef6c..dad8af58fd3 100644 --- a/airflow-core/src/airflow/utils/retries.py +++ b/airflow-core/src/airflow/utils/retries.py @@ -42,7 +42,7 @@ def run_with_db_retries(max_retries: int = MAX_DB_RETRIES, logger: logging.Logge reraise=True, **kwargs, ) - if logger and isinstance(logger, logging.Logger): + if logger is not None: retry_kwargs["before_sleep"] = tenacity.before_sleep_log(logger, logging.DEBUG, True) return tenacity.Retrying(**retry_kwargs) diff --git a/airflow-core/tests/unit/always/test_providers_manager.py b/airflow-core/tests/unit/always/test_providers_manager.py index feb6548e997..396c910f381 100644 --- a/airflow-core/tests/unit/always/test_providers_manager.py +++ b/airflow-core/tests/unit/always/test_providers_manager.py @@ -115,8 +115,8 @@ class TestProviderManager: ) providers_manager._discover_hooks() _ = providers_manager._hooks_lazy_dict["wrong-connection-type"] - assert len(self._caplog.records) == 1 - assert "Inconsistency!" in self._caplog.records[0].message + assert len(self._caplog.entries) == 1 + assert "Inconsistency!" in self._caplog[0]["event"] assert "sftp" not in providers_manager.hooks def test_warning_logs_not_generated(self): @@ -160,11 +160,12 @@ class TestProviderManager: providers_manager._discover_hooks() _ = providers_manager._hooks_lazy_dict["dummy"] assert len(self._caplog.records) == 1 - assert "The connection type 'dummy' is already registered" in self._caplog.records[0].message + msg = self._caplog.messages[0] + assert msg.startswith("The connection type 'dummy' is already registered") assert ( "different class names: 'airflow.providers.dummy.hooks.dummy.DummyHook'" " and 'airflow.providers.dummy.hooks.dummy.DummyHook2'." - ) in self._caplog.records[0].message + ) in msg def test_providers_manager_register_plugins(self): providers_manager = ProvidersManager() diff --git a/airflow-core/tests/unit/core/test_logging_config.py b/airflow-core/tests/unit/core/test_logging_config.py deleted file mode 100644 index 29dae7ff712..00000000000 --- a/airflow-core/tests/unit/core/test_logging_config.py +++ /dev/null @@ -1,344 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import contextlib -import importlib -import logging -import os -import pathlib -import sys -import tempfile -from unittest.mock import patch - -import pytest - -from airflow.configuration import conf - -from tests_common.test_utils.config import conf_vars - -SETTINGS_FILE_VALID = """ -LOGGING_CONFIG = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'airflow.task': { - 'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s' - }, - }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'airflow.task', - 'stream': 'ext://sys.stdout' - }, - 'task': { - 'class': 'logging.StreamHandler', - 'formatter': 'airflow.task', - 'stream': 'ext://sys.stdout' - }, - }, - 'loggers': { - 'airflow.task': { - 'handlers': ['task'], - 'level': 'INFO', - 'propagate': False, - }, - } -} -""" - -SETTINGS_FILE_INVALID = """ -LOGGING_CONFIG = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'airflow.task': { - 'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s' - }, - }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'airflow.task', - 'stream': 'ext://sys.stdout' - } - }, - 'loggers': { - 'airflow': { - 'handlers': ['file.handler'], # this handler does not exists - 'level': 'INFO', - 'propagate': False - } - } -} -""" - -SETTINGS_FILE_EMPTY = """ -# Other settings here -""" - -SETTINGS_DEFAULT_NAME = "custom_airflow_local_settings" - - -def reset_logging(): - """Reset Logging""" - manager = logging.root.manager - manager.disabled = logging.NOTSET - airflow_loggers = [ - logger for logger_name, logger in manager.loggerDict.items() if logger_name.startswith("airflow") - ] - for logger in airflow_loggers: - if isinstance(logger, logging.Logger): - logger.setLevel(logging.NOTSET) - logger.propagate = True - logger.disabled = False - logger.filters.clear() - handlers = logger.handlers.copy() - for handler in handlers: - # Copied from `logging.shutdown`. - try: - handler.acquire() - handler.flush() - handler.close() - except (OSError, ValueError): - pass - finally: - handler.release() - logger.removeHandler(handler) - - [email protected] -def settings_context(content, directory=None, name="LOGGING_CONFIG"): - """ - Sets a settings file and puts it in the Python classpath - - :param content: - The content of the settings file - :param directory: the directory - :param name: str - """ - initial_logging_config = os.environ.get("AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS", "") - try: - settings_root = tempfile.mkdtemp() - filename = f"{SETTINGS_DEFAULT_NAME}.py" - if directory: - # Create the directory structure with __init__.py - dir_path = os.path.join(settings_root, directory) - pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True) - - basedir = settings_root - for part in directory.split("/"): - open(os.path.join(basedir, "__init__.py"), "w").close() - basedir = os.path.join(basedir, part) - open(os.path.join(basedir, "__init__.py"), "w").close() - - # Replace slashes by dots - module = directory.replace("/", ".") + "." + SETTINGS_DEFAULT_NAME + "." + name - settings_file = os.path.join(dir_path, filename) - else: - module = SETTINGS_DEFAULT_NAME + "." + name - settings_file = os.path.join(settings_root, filename) - - with open(settings_file, "w") as handle: - handle.writelines(content) - sys.path.append(settings_root) - - # Using environment vars instead of conf_vars so value is accessible - # to parent and child processes when using 'spawn' for multiprocessing. - os.environ["AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS"] = module - yield settings_file - - finally: - os.environ["AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS"] = initial_logging_config - sys.path.remove(settings_root) - - -class TestLoggingSettings: - # Make sure that the configure_logging is not cached - def setup_method(self): - self.old_modules = dict(sys.modules) - - def teardown_method(self): - # Remove any new modules imported during the test run. This lets us - # import the same source files for more than one test. - from airflow.config_templates import airflow_local_settings - from airflow.logging_config import configure_logging - - for mod in list(sys.modules): - if mod not in self.old_modules: - del sys.modules[mod] - - reset_logging() - importlib.reload(airflow_local_settings) - configure_logging() - - # When we try to load an invalid config file, we expect an error - def test_loading_invalid_local_settings(self): - from airflow.logging_config import configure_logging, log - - with settings_context(SETTINGS_FILE_INVALID): - with patch.object(log, "error") as mock_info: - # Load config - with pytest.raises(ValueError): - configure_logging() - - mock_info.assert_called_once_with( - "Unable to load the config, contains a configuration error." - ) - - def test_loading_valid_complex_local_settings(self): - # Test what happens when the config is somewhere in a subfolder - module_structure = "etc.airflow.config" - dir_structure = module_structure.replace(".", "/") - with settings_context(SETTINGS_FILE_VALID, dir_structure): - from airflow.logging_config import configure_logging, log - - with patch.object(log, "info") as mock_info: - configure_logging() - mock_info.assert_called_once_with( - "Successfully imported user-defined logging config from %s", - f"etc.airflow.config.{SETTINGS_DEFAULT_NAME}.LOGGING_CONFIG", - ) - - # When we try to load a valid config - def test_loading_valid_local_settings(self): - with settings_context(SETTINGS_FILE_VALID): - from airflow.logging_config import configure_logging, log - - with patch.object(log, "info") as mock_info: - configure_logging() - mock_info.assert_called_once_with( - "Successfully imported user-defined logging config from %s", - f"{SETTINGS_DEFAULT_NAME}.LOGGING_CONFIG", - ) - - # When we load an empty file, it should go to default - def test_loading_no_local_settings(self): - with settings_context(SETTINGS_FILE_EMPTY): - from airflow.logging_config import configure_logging - - with pytest.raises(ImportError): - configure_logging() - - # When the key is not available in the configuration - def test_when_the_config_key_does_not_exists(self): - from airflow import logging_config - - with conf_vars({("logging", "logging_config_class"): None}): - with patch.object(logging_config.log, "debug") as mock_debug: - logging_config.configure_logging() - mock_debug.assert_any_call("Unable to load custom logging, using default config instead") - - # Just default - def test_loading_local_settings_without_logging_config(self): - from airflow.logging_config import configure_logging, log - - with patch.object(log, "debug") as mock_info: - configure_logging() - mock_info.assert_called_once_with("Unable to load custom logging, using default config instead") - - def test_1_9_config(self): - from airflow.logging_config import configure_logging - - with conf_vars({("logging", "task_log_reader"): "file.task"}): - with pytest.warns(DeprecationWarning, match=r"file.task"): - configure_logging() - assert conf.get("logging", "task_log_reader") == "task" - - def test_loading_remote_logging_with_wasb_handler(self): - """Test if logging can be configured successfully for Azure Blob Storage""" - pytest.importorskip( - "airflow.providers.microsoft.azure", reason="'microsoft.azure' provider not installed" - ) - from airflow.config_templates import airflow_local_settings - from airflow.logging_config import configure_logging - from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbTaskHandler - - with conf_vars( - { - ("logging", "remote_logging"): "True", - ("logging", "remote_log_conn_id"): "some_wasb", - ("logging", "remote_base_log_folder"): "wasb://some-folder", - } - ): - importlib.reload(airflow_local_settings) - configure_logging() - - logger = logging.getLogger("airflow.task") - assert isinstance(logger.handlers[0], WasbTaskHandler) - - @pytest.mark.parametrize( - "remote_base_log_folder, log_group_arn", - [ - ( - "cloudwatch://arn:aws:logs:aaaa:bbbbb:log-group:ccccc", - "arn:aws:logs:aaaa:bbbbb:log-group:ccccc", - ), - ( - "cloudwatch://arn:aws:logs:aaaa:bbbbb:log-group:aws/ccccc", - "arn:aws:logs:aaaa:bbbbb:log-group:aws/ccccc", - ), - ( - "cloudwatch://arn:aws:logs:aaaa:bbbbb:log-group:/aws/ecs/ccccc", - "arn:aws:logs:aaaa:bbbbb:log-group:/aws/ecs/ccccc", - ), - ], - ) - def test_log_group_arns_remote_logging_with_cloudwatch_handler( - self, remote_base_log_folder, log_group_arn - ): - """Test if the correct ARNs are configured for Cloudwatch""" - from airflow.config_templates import airflow_local_settings - from airflow.logging_config import configure_logging - - with conf_vars( - { - ("logging", "remote_logging"): "True", - ("logging", "remote_log_conn_id"): "some_cloudwatch", - ("logging", "remote_base_log_folder"): remote_base_log_folder, - } - ): - importlib.reload(airflow_local_settings) - configure_logging() - assert ( - airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]["log_group_arn"] - == log_group_arn - ) - - def test_loading_remote_logging_with_kwargs(self): - """Test if logging can be configured successfully with kwargs""" - pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider not installed") - from airflow.config_templates import airflow_local_settings - from airflow.logging_config import configure_logging - from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler - - with conf_vars( - { - ("logging", "remote_logging"): "True", - ("logging", "remote_log_conn_id"): "some_s3", - ("logging", "remote_base_log_folder"): "s3://some-folder", - ("logging", "remote_task_handler_kwargs"): '{"delete_local_copy": true}', - } - ): - importlib.reload(airflow_local_settings) - configure_logging() - - logger = logging.getLogger("airflow.task") - assert isinstance(logger.handlers[0], S3TaskHandler) - assert getattr(logger.handlers[0], "delete_local_copy") is True diff --git a/airflow-core/tests/unit/core/test_stats.py b/airflow-core/tests/unit/core/test_stats.py index a297492dbf8..cff52656091 100644 --- a/airflow-core/tests/unit/core/test_stats.py +++ b/airflow-core/tests/unit/core/test_stats.py @@ -375,11 +375,11 @@ class TestPatternValidatorConfigOption: @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): "baz,qux"}) def test_setting_allow_and_block_logs_warning(self, caplog): - importlib.reload(airflow.stats) - - assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) - assert type(airflow.stats.Stats.instance.metrics_validator) is PatternAllowListValidator with caplog.at_level(logging.WARNING): + importlib.reload(airflow.stats) + + assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) + assert type(airflow.stats.Stats.instance.metrics_validator) is PatternAllowListValidator assert "Ignoring metrics_block_list" in caplog.text diff --git a/airflow-core/tests/unit/sensors/test_external_task_sensor.py b/airflow-core/tests/unit/sensors/test_external_task_sensor.py index b8f9b477aa8..947c125f8be 100644 --- a/airflow-core/tests/unit/sensors/test_external_task_sensor.py +++ b/airflow-core/tests/unit/sensors/test_external_task_sensor.py @@ -393,8 +393,7 @@ class TestExternalTaskSensor: caplog.clear() op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert ( - f"Poking for tasks ['{TEST_TASK_ID}'] " - f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... " + f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... " ) in caplog.messages def test_external_task_sensor_external_task_ids_param(self, caplog): @@ -412,8 +411,7 @@ class TestExternalTaskSensor: caplog.clear() op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert ( - f"Poking for tasks ['{TEST_TASK_ID}'] " - f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... " + f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... " ) in caplog.messages def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, caplog): @@ -1431,13 +1429,13 @@ def dag_bag_cyclic(): dags.append(dag) task_a = ExternalTaskSensor( task_id=f"task_a_{n}", - external_dag_id=f"dag_{n-1}", - external_task_id=f"task_b_{n-1}", + external_dag_id=f"dag_{n - 1}", + external_task_id=f"task_b_{n - 1}", ) task_b = ExternalTaskMarker( task_id=f"task_b_{n}", - external_dag_id=f"dag_{n+1}", - external_task_id=f"task_a_{n+1}", + external_dag_id=f"dag_{n + 1}", + external_task_id=f"task_a_{n + 1}", recursion_depth=3, ) task_a >> task_b @@ -1447,8 +1445,8 @@ def dag_bag_cyclic(): dags.append(dag) task_a = ExternalTaskSensor( task_id=f"task_a_{depth}", - external_dag_id=f"dag_{depth-1}", - external_task_id=f"task_b_{depth-1}", + external_dag_id=f"dag_{depth - 1}", + external_task_id=f"task_b_{depth - 1}", ) task_b = ExternalTaskMarker( task_id=f"task_b_{depth}", diff --git a/airflow-core/tests/unit/utils/log/test_log_reader.py b/airflow-core/tests/unit/utils/log/test_log_reader.py index 150425eb5d1..e4e7bb0ea15 100644 --- a/airflow-core/tests/unit/utils/log/test_log_reader.py +++ b/airflow-core/tests/unit/utils/log/test_log_reader.py @@ -18,10 +18,10 @@ from __future__ import annotations import copy import datetime -import logging import os import sys import tempfile +import types from typing import TYPE_CHECKING from unittest import mock @@ -78,16 +78,26 @@ class TestLogView: @pytest.fixture(autouse=True) def configure_loggers(self, log_dir, settings_folder): - logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) - logging_config["handlers"]["task"]["base_log_folder"] = log_dir - settings_file = os.path.join(settings_folder, "airflow_local_settings_test.py") - with open(settings_file, "w") as handle: - new_logging_file = f"LOGGING_CONFIG = {logging_config}" - handle.writelines(new_logging_file) + logging_config = {**DEFAULT_LOGGING_CONFIG} + logging_config["handlers"] = {**logging_config["handlers"]} + logging_config["handlers"]["task"] = { + **logging_config["handlers"]["task"], + "base_log_folder": log_dir, + } + + mod = types.SimpleNamespace() + mod.LOGGING_CONFIG = logging_config + + # "Inject" a fake module into sys so it loads it without needing to write valid python code + sys.modules["airflow_local_settings_test"] = mod + with conf_vars({("logging", "logging_config_class"): "airflow_local_settings_test.LOGGING_CONFIG"}): settings.configure_logging() - yield - logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + try: + yield + finally: + del sys.modules["airflow_local_settings_test"] + settings.configure_logging() @pytest.fixture(autouse=True) def prepare_log_files(self, log_dir): diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index f94c83fe92b..ba32e250f25 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -36,6 +36,7 @@ from pydantic import TypeAdapter from pydantic.v1.utils import deep_update from requests.adapters import Response +from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.executors import executor_constants, executor_loader from airflow.jobs.job import Job @@ -61,7 +62,7 @@ from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.xfail()] DEFAULT_DATE = datetime(2016, 1, 1) TASK_LOGGER = "airflow.task" @@ -88,8 +89,7 @@ class TestFileTaskLogHandler: session.query(TaskInstance).delete() def setup_method(self): - logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) - logging.root.disabled = False + settings.configure_logging() self.clean_up() # We use file task handler by default. diff --git a/airflow-core/tests/unit/utils/test_logging_mixin.py b/airflow-core/tests/unit/utils/test_logging_mixin.py index 14e3532074b..4ad65db7fb1 100644 --- a/airflow-core/tests/unit/utils/test_logging_mixin.py +++ b/airflow-core/tests/unit/utils/test_logging_mixin.py @@ -93,18 +93,6 @@ class TestLoggingMixin: assert DummyClass().log.name == "unit.utils.test_logging_mixin.DummyClass" - def test_logger_name_is_root_when_logger_name_is_empty_string(self): - """ - Ensure that when `_logger_name` is set as an empty string, the resulting logger name is an empty - string too, which result in a logger with 'root' as name. - Note: Passing an empty string to `logging.getLogger` will create a logger with name 'root'. - """ - - class EmptyStringLogger(LoggingMixin): - _logger_name: str | None = "" - - assert EmptyStringLogger().log.name == "root" - def test_log_config_logger_name_correctly_prefix_logger_name(self): """ Ensure that when a class has `_log_config_logger_name`, it is used as prefix in the final logger diff --git a/airflow-core/tests/unit/utils/test_process_utils.py b/airflow-core/tests/unit/utils/test_process_utils.py index 7edc5f28b73..47ffc9245f3 100644 --- a/airflow-core/tests/unit/utils/test_process_utils.py +++ b/airflow-core/tests/unit/utils/test_process_utils.py @@ -103,19 +103,22 @@ class TestReapProcessGroup: class TestExecuteInSubProcess: def test_should_print_all_messages1(self, caplog): execute_in_subprocess(["bash", "-c", "echo CAT; echo KITTY;"]) - msgs = [record.getMessage() for record in caplog.records] - assert msgs == ["Executing cmd: bash -c 'echo CAT; echo KITTY;'", "Output:", "CAT", "KITTY"] + assert caplog.messages == [ + "Executing cmd: bash -c 'echo CAT; echo KITTY;'", + "Output:", + "CAT", + "KITTY", + ] def test_should_print_all_messages_from_cwd(self, caplog, tmp_path): execute_in_subprocess(["bash", "-c", "echo CAT; pwd; echo KITTY;"], cwd=str(tmp_path)) - msgs = [record.getMessage() for record in caplog.records] assert [ "Executing cmd: bash -c 'echo CAT; pwd; echo KITTY;'", "Output:", "CAT", str(tmp_path), "KITTY", - ] == msgs + ] == caplog.messages def test_using_env_works(self, caplog): execute_in_subprocess(["bash", "-c", 'echo "My value is ${VALUE}"'], env=dict(VALUE="1")) diff --git a/airflow-core/tests/unit/utils/test_task_handler_with_custom_formatter.py b/airflow-core/tests/unit/utils/test_task_handler_with_custom_formatter.py deleted file mode 100644 index 118e442d0c1..00000000000 --- a/airflow-core/tests/unit/utils/test_task_handler_with_custom_formatter.py +++ /dev/null @@ -1,115 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import logging - -import pytest - -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG -from airflow.models.taskinstance import TaskInstance -from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.utils.log.logging_mixin import set_context -from airflow.utils.state import DagRunState -from airflow.utils.timezone import datetime -from airflow.utils.types import DagRunType - -from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.db import clear_db_runs -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - -pytestmark = pytest.mark.db_test - - -DEFAULT_DATE = datetime(2019, 1, 1) -TASK_HANDLER = "task" -TASK_HANDLER_CLASS = "airflow.utils.log.task_handler_with_custom_formatter.TaskHandlerWithCustomFormatter" -PREV_TASK_HANDLER = DEFAULT_LOGGING_CONFIG["handlers"]["task"] - -DAG_ID = "task_handler_with_custom_formatter_dag" -TASK_ID = "task_handler_with_custom_formatter_task" - - [email protected](scope="module", autouse=True) -def custom_task_log_handler_config(): - DEFAULT_LOGGING_CONFIG["handlers"]["task"] = { - "class": TASK_HANDLER_CLASS, - "formatter": "airflow", - "stream": "sys.stdout", - } - logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) - logging.root.disabled = False - yield - DEFAULT_LOGGING_CONFIG["handlers"]["task"] = PREV_TASK_HANDLER - logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) - - [email protected] -def task_instance(dag_maker): - with dag_maker(DAG_ID, start_date=DEFAULT_DATE, serialized=True) as dag: - task = EmptyOperator(task_id=TASK_ID) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag_maker.create_dagrun( - state=DagRunState.RUNNING, - logical_date=DEFAULT_DATE, - run_type=DagRunType.MANUAL, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) - ti = TaskInstance(task=task, run_id=dagrun.run_id) - ti.log.disabled = False - yield ti - clear_db_runs() - - -def assert_prefix_once(task_instance: TaskInstance, prefix: str) -> None: - handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None) - assert handler is not None, "custom task log handler not set up correctly" - assert handler.formatter is not None, "custom task log formatter not set up correctly" - previous_formatter = handler.formatter - expected_format = f"{prefix}:{handler.formatter._fmt}" - set_context(task_instance.log, task_instance) - assert expected_format == handler.formatter._fmt - handler.setFormatter(previous_formatter) - - -def assert_prefix_multiple(task_instance: TaskInstance, prefix: str) -> None: - handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None) - assert handler is not None, "custom task log handler not set up correctly" - assert handler.formatter is not None, "custom task log formatter not set up correctly" - previous_formatter = handler.formatter - expected_format = f"{prefix}:{handler.formatter._fmt}" - set_context(task_instance.log, task_instance) - set_context(task_instance.log, task_instance) - set_context(task_instance.log, task_instance) - assert expected_format == handler.formatter._fmt - handler.setFormatter(previous_formatter) - - -def test_custom_formatter_default_format(task_instance): - """The default format provides no prefix.""" - assert_prefix_once(task_instance, "") - - -@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ ti.task_id }}"}) -def test_custom_formatter_custom_format_not_affected_by_config(task_instance): - """Certifies that the prefix is only added once, even after repeated calls""" - assert_prefix_multiple(task_instance, f"{DAG_ID}-{TASK_ID}") diff --git a/devel-common/src/tests_common/pytest_plugin.py b/devel-common/src/tests_common/pytest_plugin.py index ad885d176c7..3a1ac156170 100644 --- a/devel-common/src/tests_common/pytest_plugin.py +++ b/devel-common/src/tests_common/pytest_plugin.py @@ -18,6 +18,7 @@ from __future__ import annotations import json +import logging import os import platform import re @@ -1767,7 +1768,7 @@ def add_expected_folders_to_pythonpath(): @pytest.fixture -def cap_structlog(): +def cap_structlog(request, monkeypatch): """ Test that structlog messages are logged. @@ -1782,59 +1783,46 @@ def cap_structlog(): ... ... assert "not logged" not in cap_structlog # not in works too """ - import structlog.testing - from structlog import configure, get_config - - class LogCapture(structlog.testing.LogCapture): - def __contains__(self, target): - import operator - - if isinstance(target, str): - - def predicate(e): - return e["event"] == target - elif isinstance(target, dict): - # Partial comparison -- only check keys passed in - get = operator.itemgetter(*target.keys()) - want = tuple(target.values()) - - def predicate(e): - try: - return get(e) == want - except Exception: - return False - else: - raise TypeError(f"Can't search logs using {type(target)}") - - return any(predicate(e) for e in self.entries) - - def __getitem__(self, i): - return self.entries[i] + import structlog.stdlib + from structlog import DropEvent, configure, get_config - def __iter__(self): - return iter(self.entries) + from tests_common.test_utils.logs import StructlogCapture - def __repr__(self): - return repr(self.entries) - - @property - def text(self): - """All the event text as a single multi-line string.""" - return "\n".join(e["event"] for e in self.entries) - - cap = LogCapture() + cap = StructlogCapture() # Modify `_Configuration.default_processors` set via `configure` but always # keep the list instance intact to not break references held by bound # loggers. processors = get_config()["processors"] old_processors = processors.copy() + + # And modify the stdlib logging to capture too + handler = logging.root.handlers[0] + if not isinstance(handler.formatter, structlog.stdlib.ProcessorFormatter): + raise AssertionError( + f"{type(handler.formatter)} is not an instance of structlog.stblid.ProcessorFormatter" + ) + + std_formatter = structlog.stdlib.ProcessorFormatter( + foreign_pre_chain=handler.formatter.foreign_pre_chain, + pass_foreign_args=True, + processor=cap, + ) + + def stdlib_filter(record): + with suppress(DropEvent): + std_formatter.format(record) + return False + try: # clear processors list and use LogCapture for testing processors.clear() processors.append(cap) configure(processors=processors) + monkeypatch.setattr(handler, "level", logging.DEBUG) + monkeypatch.setattr(handler, "filters", [stdlib_filter]) yield cap finally: + cap._finalize() # remove LogCapture and restore original processors processors.clear() processors.extend(old_processors) @@ -1849,6 +1837,8 @@ def override_caplog(request): This is in an effort to reduce flakiness from caplog related tests where one test file can change log behaviour and bleed in to affecting other test files """ + yield request.getfixturevalue("cap_structlog") + return # We need this `_ispytest` so it doesn't warn about using private fixture = pytest.LogCaptureFixture(request.node, _ispytest=True) yield fixture diff --git a/devel-common/src/tests_common/test_utils/logs.py b/devel-common/src/tests_common/test_utils/logs.py index 03cbe0b55b9..511d8f87b5d 100644 --- a/devel-common/src/tests_common/test_utils/logs.py +++ b/devel-common/src/tests_common/test_utils/logs.py @@ -18,10 +18,16 @@ from __future__ import annotations import json +import logging +from contextlib import contextmanager +from typing import TYPE_CHECKING, NoReturn from airflow.models import Log from airflow.sdk.execution_time.secrets_masker import DEFAULT_SENSITIVE_FIELDS +if TYPE_CHECKING: + from structlog.typing import EventDict, WrappedLogger + def _masked_value_check(data, sensitive_fields): """ @@ -69,3 +75,188 @@ def check_last_log(session, dag_id, event, logical_date, expected_extra=None, ch _masked_value_check(extra_json, DEFAULT_SENSITIVE_FIELDS) session.query(Log).delete() + + +class StructlogCapture: + """ + Test that structlog messages are logged. + + This extends the feature built in to structlog to make it easier to find if a message is logged. + + >>> def test_something(cap_structlog): + ... log.info("some event", field1=False, field2=[1, 2]) + ... log.info("some event", field1=True) + ... assert "some_event" in cap_structlog # a string searches on `event` field + ... assert {"event": "some_event", "field1": True} in cap_structlog # Searches only on passed fields + ... assert {"field2": [1, 2]} in cap_structlog + ... + ... assert "not logged" not in cap_structlog # not in works too + + This fixture class will also manage the log level of stdlib loggers via ``at_level`` and ``set_level``. + """ + + # This class is a manual mixing of pytest's LogCaptureFixture and structlog's LogCapture class, but + # tailored to Airflow's "send all logs via structlog" approach + + _logger: str | None = None + """The logger we specifically want to capture log messages from""" + + def __init__(self): + self.entries = [] + self._initial_logger_levels: dict[str | None, int] = {} + + def _finalize(self) -> None: + """ + Finalize the fixture. + + This restores the log levels and the disabled logging levels changed by :meth:`set_level`. + """ + from airflow._logging.structlog import PER_LOGGER_LEVELS + + for logger_name, level in self._initial_logger_levels.items(): + logger = logging.getLogger(logger_name) + logger.setLevel(level) + if level is logging.NOTSET: + del PER_LOGGER_LEVELS[logger_name] + else: + PER_LOGGER_LEVELS[logger_name] = level + + def __contains__(self, target): + import operator + + if isinstance(target, str): + + def predicate(e): + return e["event"] == target + elif isinstance(target, dict): + # Partial comparison -- only check keys passed in + get = operator.itemgetter(*target.keys()) + want = tuple(target.values()) + + def predicate(e): + try: + return get(e) == want + except Exception: + return False + else: + raise TypeError(f"Can't search logs using {type(target)}") + + return any(predicate(e) for e in self.entries) + + def __getitem__(self, i): + return self.entries[i] + + def __iter__(self): + return iter(self.entries) + + def __repr__(self): + return repr(self.entries) + + def __call__(self, logger: WrappedLogger, method_name: str, event_dict: EventDict) -> NoReturn: + from structlog import DropEvent + from structlog._log_levels import map_method_name + + from airflow._logging.structlog import ( + NamedBytesLogger, + NamedWriteLogger, + ) + + logger_name = ( + event_dict.get("logger_name") + or event_dict.get("logger") + or (isinstance(logger, (NamedBytesLogger, NamedWriteLogger)) and logger.name) + or "" + ) + if not self._logger or logger_name.startswith(self._logger): + event_dict["log_level"] = map_method_name(method_name) + self.entries.append(event_dict) + + raise DropEvent + + @property + def text(self): + """All the event text as a single multi-line string.""" + return "\n".join(e["event"] for e in self.entries) + + # These next fns make it duck-type the same as Pytests "caplog" fixture + @property + def messages(self): + """All the event messages as a list.""" + return [e["event"] for e in self.entries] + + def _force_enable_logging(self, level: int, logger_obj: logging.Logger) -> int: + """ + Enable the desired logging level if the global level was disabled via ``logging.disabled``. + + Only enables logging levels greater than or equal to the requested ``level``. + + Does nothing if the desired ``level`` wasn't disabled. + + :param level: + The logger level caplog should capture. + All logging is enabled if a non-standard logging level string is supplied. + Valid level strings are in :data:`logging._nameToLevel`. + :param logger_obj: The logger object to check. + + :return: The original disabled logging level. + """ + original_disable_level: int = logger_obj.manager.disable + + if not logger_obj.isEnabledFor(level): + # Each level is `10` away from other levels. + # https://docs.python.org/3/library/logging.html#logging-levels + disable_level = max(level - 10, logging.NOTSET) + logging.disable(disable_level) + + return original_disable_level + + @contextmanager + def at_level(self, level: str | int, logger=None): + from airflow._logging.structlog import NAME_TO_LEVEL, PER_LOGGER_LEVELS + + if isinstance(level, str): + level = NAME_TO_LEVEL[level.lower()] + + key = logger or "" + old = PER_LOGGER_LEVELS.get(key, logging.NOTSET) + PER_LOGGER_LEVELS[key] = level + stdlogger = logging.getLogger(key) + stdlogger.setLevel(level) + hdlr = orig_hdlr_level = None + if stdlogger.handlers: + hdlr = stdlogger.handlers[0] + orig_hdlr_level = hdlr.level + hdlr.setLevel(level) + try: + yield self + finally: + stdlogger.setLevel(old) + if hdlr is not None: + hdlr.setLevel(orig_hdlr_level) + if old is logging.NOTSET: + del PER_LOGGER_LEVELS[key] + else: + PER_LOGGER_LEVELS[key] = old + + def set_level(self, level: str | int, logger=None): + from airflow._logging.structlog import NAME_TO_LEVEL, PER_LOGGER_LEVELS + + # Set the global level + if isinstance(level, str): + level = NAME_TO_LEVEL[level.lower()] + + key = logger or "" + + stdlogger = logging.getLogger(key) + self._initial_logger_levels[key] = PER_LOGGER_LEVELS.get(key, logging.NOTSET) + + PER_LOGGER_LEVELS[key] = level + stdlogger.setLevel(level) + self._logger = logger + + def clear(self): + self.entries = [] + + @property + def records(self): + return self.entries
