This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0ef71f17beb01150627cdbc0a429d58fdb9750d9 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Thu Sep 11 11:18:53 2025 +0100 Once again redact JWT tokens in task logs (#55499) The version of redact_jwt that got copies across in to the shared library was ot the working version (it only redacted jwt-like things when the string started with them) -- I have fixed this by moving across the right version, and also to prevent future confusion I have removed it, and all the other now unused processors from airflow.sdk.log Fixes #55466 (cherry picked from commit 1c104a7d8dc6018e4121f688c460d509cb41c61c) --- .../src/airflow_shared/logging/structlog.py | 5 +- task-sdk/src/airflow/sdk/log.py | 98 +--------------------- 2 files changed, 5 insertions(+), 98 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 6e66c50ab60..bc381dfbc7d 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -53,6 +53,7 @@ __all__ = [ "structlog_processors", ] +JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*") LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {} @@ -202,8 +203,8 @@ def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDi # token. Better safe than sorry def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: for k, v in event_dict.items(): - if isinstance(v, str) and v.startswith("eyJ"): - event_dict[k] = "eyJ***" + if isinstance(v, str): + event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v) return event_dict diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 28b51d08d6d..5923848ec8d 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -17,14 +17,10 @@ # under the License. from __future__ import annotations -import itertools -import logging.config -import re -import sys import warnings from functools import cache from pathlib import Path -from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar +from typing import TYPE_CHECKING, Any, BinaryIO, TextIO import structlog @@ -33,9 +29,7 @@ import structlog from pydantic import JsonValue # noqa: TC002 if TYPE_CHECKING: - from collections.abc import Callable - - from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, Processor + from structlog.typing import EventDict, FilteringBoundLogger, Processor from airflow.logging_config import RemoteLogIO from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI @@ -44,65 +38,6 @@ if TYPE_CHECKING: __all__ = ["configure_logging", "reset_logging", "mask_secret"] -JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*") - - -def exception_group_tracebacks( - format_exception: Callable[[ExcInfo], list[dict[str, Any]]], -) -> Processor: - # Make mypy happy - if not hasattr(__builtins__, "BaseExceptionGroup"): - T = TypeVar("T") - - class BaseExceptionGroup(Generic[T]): - exceptions: list[T] - - def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: - if exc_info := event_dict.get("exc_info", None): - group: BaseExceptionGroup[Exception] | None = None - if exc_info is True: - # `log.exception('mesg")` case - exc_info = sys.exc_info() - if exc_info[0] is None: - exc_info = None - - if ( - isinstance(exc_info, tuple) - and len(exc_info) == 3 - and isinstance(exc_info[1], BaseExceptionGroup) - ): - group = exc_info[1] - elif isinstance(exc_info, BaseExceptionGroup): - group = exc_info - - if group: - # Only remove it from event_dict if we handle it - del event_dict["exc_info"] - event_dict["exception"] = list( - itertools.chain.from_iterable( - format_exception((type(exc), exc, exc.__traceback__)) # type: ignore[attr-defined,arg-type] - for exc in (*group.exceptions, group) - ) - ) - - return event_dict - - return _exception_group_tracebacks - - -def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: - if logger_name := event_dict.pop("logger_name", None): - event_dict.setdefault("logger", logger_name) - return event_dict - - -def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: - for k, v in event_dict.items(): - if isinstance(v, str): - event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v) - return event_dict - - def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: from airflow.sdk._shared.secrets_masker import redact @@ -110,35 +45,6 @@ def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict return event_dict -def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict: - event_dict.pop("positional_args", None) - return event_dict - - -class StdBinaryStreamHandler(logging.StreamHandler): - """A logging.StreamHandler that sends logs as binary JSON over the given stream.""" - - stream: BinaryIO - - def __init__(self, stream: BinaryIO): - super().__init__(stream) - - def emit(self, record: logging.LogRecord): - try: - msg = self.format(record) - buffer = bytearray(msg, "utf-8", "backslashreplace") - - buffer += b"\n" - - stream = self.stream - stream.write(buffer) - self.flush() - except RecursionError: # See issue 36272 - raise - except Exception: - self.handleError(record) - - @cache def logging_processors( json_output: bool,
