This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0e4cbef8bd67a22569d50cf295fa9489cf77ed23 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Tue Sep 9 20:59:49 2025 +0100 Fixup to structlog migration - logging folder not being created correctly. (#55431) This was broken in #52651 with our move away from FileTaskHandler, and hidden when running in Breeze due to the default logs folder already existing. --- airflow-core/src/airflow/logging_config.py | 14 +++++- .../logging/src/airflow_shared/logging/__init__.py | 8 ++- .../src/airflow_shared/logging/structlog.py | 57 ++++++++++++++++++++++ task-sdk/src/airflow/sdk/log.py | 50 +++---------------- 4 files changed, 84 insertions(+), 45 deletions(-) diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index cce8b37f684..d4a77782c34 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -85,7 +85,7 @@ def load_logging_config() -> tuple[dict[str, Any], str]: def configure_logging(): - from airflow._shared.logging.structlog import configure_logging + from airflow._shared.logging import configure_logging, init_log_folder logging_config, logging_class_path = load_logging_config() try: @@ -100,6 +100,18 @@ def configure_logging(): validate_logging_config() + new_folder_permissions = int( + conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), + 8, + ) + + base_log_folder = conf.get("logging", "base_log_folder") + + return init_log_folder( + base_log_folder, + new_folder_permissions=new_folder_permissions, + ) + return logging_class_path diff --git a/shared/logging/src/airflow_shared/logging/__init__.py b/shared/logging/src/airflow_shared/logging/__init__.py index 8f1cd395396..274c404a5c4 100644 --- a/shared/logging/src/airflow_shared/logging/__init__.py +++ b/shared/logging/src/airflow_shared/logging/__init__.py @@ -16,4 +16,10 @@ # under the License. from __future__ import annotations -from .structlog import configure_logging as configure_logging +__all__ = [ + "configure_logging", + "init_log_file", + "init_log_folder", +] + +from .structlog import configure_logging, init_log_file, init_log_folder diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 8510520474a..6e66c50ab60 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -25,6 +25,7 @@ import re import sys from collections.abc import Callable, Mapping, Sequence from functools import cache, cached_property, partial +from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast import pygtrie @@ -505,6 +506,62 @@ def configure_logging( logging.config.dictConfig(config) +def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: int): + """ + Prepare the log folder and ensure its mode is as configured. + + To handle log writing when tasks are impersonated, the log files need to + be writable by the user that runs the Airflow command and the user + that is impersonated. This is mainly to handle corner cases with the + SubDagOperator. When the SubDagOperator is run, all of the operators + run under the impersonated user and create appropriate log files + as the impersonated user. However, if the user manually runs tasks + of the SubDagOperator through the UI, then the log files are created + by the user that runs the Airflow command. For example, the Airflow + run command may be run by the `airflow_sudoable` user, but the Airflow + tasks may be run by the `airflow` user. If the log files are not + writable by both users, then it's possible that re-running a task + via the UI (or vice versa) results in a permission error as the task + tries to write to a log file created by the other user. + + We leave it up to the user to manage their permissions by exposing configuration for both + new folders and new log files. Default is to make new log folders and files group-writeable + to handle most common impersonation use cases. The requirement in this case will be to make + sure that the same group is set as default group for both - impersonated user and main airflow + user. + """ + directory = Path(directory) + for parent in reversed(Path(directory).parents): + parent.mkdir(mode=new_folder_permissions, exist_ok=True) + directory.mkdir(mode=new_folder_permissions, exist_ok=True) + + +def init_log_file( + base_log_folder: str | os.PathLike[str], + local_relative_path: str | os.PathLike[str], + *, + new_folder_permissions: int = 0o775, + new_file_permissions: int = 0o664, +) -> Path: + """ + Ensure log file and parent directories are created with the correct permissions. + + Any directories that are missing are created with the right permission bits. + + See above ``init_log_folder`` method for more detailed explanation. + """ + full_path = Path(base_log_folder, local_relative_path) + init_log_folder(full_path.parent, new_folder_permissions) + + try: + full_path.touch(new_file_permissions) + except OSError as e: + log = structlog.get_logger(__name__) + log.warning("OSError while changing ownership of the log file. %s", e) + + return full_path + + if __name__ == "__main__": configure_logging( # json_output=True, diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index d57867a6ef9..28b51d08d6d 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -222,46 +222,14 @@ def logger_at_level(name: str, level: int) -> Logger: ) -def _prepare_log_folder(directory: Path, mode: int): - """ - Prepare the log folder and ensure its mode is as configured. - - To handle log writing when tasks are impersonated, the log files need to - be writable by the user that runs the Airflow command and the user - that is impersonated. This is mainly to handle corner cases with the - SubDagOperator. When the SubDagOperator is run, all of the operators - run under the impersonated user and create appropriate log files - as the impersonated user. However, if the user manually runs tasks - of the SubDagOperator through the UI, then the log files are created - by the user that runs the Airflow command. For example, the Airflow - run command may be run by the `airflow_sudoable` user, but the Airflow - tasks may be run by the `airflow` user. If the log files are not - writable by both users, then it's possible that re-running a task - via the UI (or vice versa) results in a permission error as the task - tries to write to a log file created by the other user. - - We leave it up to the user to manage their permissions by exposing configuration for both - new folders and new log files. Default is to make new log folders and files group-writeable - to handle most common impersonation use cases. The requirement in this case will be to make - sure that the same group is set as default group for both - impersonated user and main airflow - user. - """ - for parent in reversed(directory.parents): - parent.mkdir(mode=mode, exist_ok=True) - directory.mkdir(mode=mode, exist_ok=True) - - def init_log_file(local_relative_path: str) -> Path: """ Ensure log file and parent directories are created. Any directories that are missing are created with the right permission bits. - - See above ``_prepare_log_folder`` method for more detailed explanation. """ - # NOTE: This is duplicated from airflow.utils.log.file_task_handler:FileTaskHandler._init_file, but we - # want to remove that from airflow.configuration import conf + from airflow.sdk._shared.logging import init_log_file new_file_permissions = int( conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), @@ -273,17 +241,13 @@ def init_log_file(local_relative_path: str) -> Path: ) base_log_folder = conf.get("logging", "base_log_folder") - full_path = Path(base_log_folder, local_relative_path) - _prepare_log_folder(full_path.parent, new_folder_permissions) - - try: - full_path.touch(new_file_permissions) - except OSError as e: - log = structlog.get_logger(__name__) - log.warning("OSError while changing ownership of the log file. %s", e) - - return full_path + return init_log_file( + base_log_folder, + local_relative_path, + new_folder_permissions=new_folder_permissions, + new_file_permissions=new_file_permissions, + ) def load_remote_log_handler() -> RemoteLogIO | None:
