This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c104a7d8dc Once again redact JWT tokens in task logs (#55499)
1c104a7d8dc is described below

commit 1c104a7d8dc6018e4121f688c460d509cb41c61c
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Sep 11 11:18:53 2025 +0100

    Once again redact JWT tokens in task logs (#55499)
    
    The version of redact_jwt that got copies across in to the shared library 
was
    ot the working version (it only redacted jwt-like things when the string
    started with them) -- I have fixed this by moving across the right version,
    and also to prevent future confusion I have removed it, and all the other 
now
    unused processors from airflow.sdk.log
    
    Fixes #55466
---
 .../src/airflow_shared/logging/structlog.py        |  5 +-
 task-sdk/src/airflow/sdk/log.py                    | 98 +---------------------
 2 files changed, 5 insertions(+), 98 deletions(-)

diff --git a/shared/logging/src/airflow_shared/logging/structlog.py 
b/shared/logging/src/airflow_shared/logging/structlog.py
index 6e66c50ab60..bc381dfbc7d 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -53,6 +53,7 @@ __all__ = [
     "structlog_processors",
 ]
 
+JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
 
 LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {}
 
@@ -202,8 +203,8 @@ def logger_name(logger: Any, method_name: Any, event_dict: 
EventDict) -> EventDi
 # token. Better safe than sorry
 def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> 
EventDict:
     for k, v in event_dict.items():
-        if isinstance(v, str) and v.startswith("eyJ"):
-            event_dict[k] = "eyJ***"
+        if isinstance(v, str):
+            event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
     return event_dict
 
 
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 28b51d08d6d..5923848ec8d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -17,14 +17,10 @@
 # under the License.
 from __future__ import annotations
 
-import itertools
-import logging.config
-import re
-import sys
 import warnings
 from functools import cache
 from pathlib import Path
-from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar
+from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
 
 import structlog
 
@@ -33,9 +29,7 @@ import structlog
 from pydantic import JsonValue  # noqa: TC002
 
 if TYPE_CHECKING:
-    from collections.abc import Callable
-
-    from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger, 
Processor
+    from structlog.typing import EventDict, FilteringBoundLogger, Processor
 
     from airflow.logging_config import RemoteLogIO
     from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as 
RuntimeTI
@@ -44,65 +38,6 @@ if TYPE_CHECKING:
 __all__ = ["configure_logging", "reset_logging", "mask_secret"]
 
 
-JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
-
-
-def exception_group_tracebacks(
-    format_exception: Callable[[ExcInfo], list[dict[str, Any]]],
-) -> Processor:
-    # Make mypy happy
-    if not hasattr(__builtins__, "BaseExceptionGroup"):
-        T = TypeVar("T")
-
-        class BaseExceptionGroup(Generic[T]):
-            exceptions: list[T]
-
-    def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict: 
EventDict) -> EventDict:
-        if exc_info := event_dict.get("exc_info", None):
-            group: BaseExceptionGroup[Exception] | None = None
-            if exc_info is True:
-                # `log.exception('mesg")` case
-                exc_info = sys.exc_info()
-                if exc_info[0] is None:
-                    exc_info = None
-
-            if (
-                isinstance(exc_info, tuple)
-                and len(exc_info) == 3
-                and isinstance(exc_info[1], BaseExceptionGroup)
-            ):
-                group = exc_info[1]
-            elif isinstance(exc_info, BaseExceptionGroup):
-                group = exc_info
-
-            if group:
-                # Only remove it from event_dict if we handle it
-                del event_dict["exc_info"]
-                event_dict["exception"] = list(
-                    itertools.chain.from_iterable(
-                        format_exception((type(exc), exc, exc.__traceback__))  
# type: ignore[attr-defined,arg-type]
-                        for exc in (*group.exceptions, group)
-                    )
-                )
-
-        return event_dict
-
-    return _exception_group_tracebacks
-
-
-def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> 
EventDict:
-    if logger_name := event_dict.pop("logger_name", None):
-        event_dict.setdefault("logger", logger_name)
-    return event_dict
-
-
-def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> 
EventDict:
-    for k, v in event_dict.items():
-        if isinstance(v, str):
-            event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
-    return event_dict
-
-
 def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> 
EventDict:
     from airflow.sdk._shared.secrets_masker import redact
 
@@ -110,35 +45,6 @@ def mask_logs(logger: Any, method_name: str, event_dict: 
EventDict) -> EventDict
     return event_dict
 
 
-def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) 
-> EventDict:
-    event_dict.pop("positional_args", None)
-    return event_dict
-
-
-class StdBinaryStreamHandler(logging.StreamHandler):
-    """A logging.StreamHandler that sends logs as binary JSON over the given 
stream."""
-
-    stream: BinaryIO
-
-    def __init__(self, stream: BinaryIO):
-        super().__init__(stream)
-
-    def emit(self, record: logging.LogRecord):
-        try:
-            msg = self.format(record)
-            buffer = bytearray(msg, "utf-8", "backslashreplace")
-
-            buffer += b"\n"
-
-            stream = self.stream
-            stream.write(buffer)
-            self.flush()
-        except RecursionError:  # See issue 36272
-            raise
-        except Exception:
-            self.handleError(record)
-
-
 @cache
 def logging_processors(
     json_output: bool,

Reply via email to