This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c22c58d1aa9 Fixup to structlog migration - logging folder not being
created correctly. (#55431)
c22c58d1aa9 is described below
commit c22c58d1aa9596b5bc084a6769e1474a3b1d8d3a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Sep 9 20:59:49 2025 +0100
Fixup to structlog migration - logging folder not being created correctly.
(#55431)
This was broken in #52651 with our move away from FileTaskHandler, and
hidden
when running in Breeze due to the default logs folder already existing.
---
airflow-core/src/airflow/logging_config.py | 14 +++++-
.../logging/src/airflow_shared/logging/__init__.py | 8 ++-
.../src/airflow_shared/logging/structlog.py | 57 ++++++++++++++++++++++
task-sdk/src/airflow/sdk/log.py | 50 +++----------------
4 files changed, 84 insertions(+), 45 deletions(-)
diff --git a/airflow-core/src/airflow/logging_config.py
b/airflow-core/src/airflow/logging_config.py
index cce8b37f684..d4a77782c34 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -85,7 +85,7 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
def configure_logging():
- from airflow._shared.logging.structlog import configure_logging
+ from airflow._shared.logging import configure_logging, init_log_folder
logging_config, logging_class_path = load_logging_config()
try:
@@ -100,6 +100,18 @@ def configure_logging():
validate_logging_config()
+ new_folder_permissions = int(
+ conf.get("logging", "file_task_handler_new_folder_permissions",
fallback="0o775"),
+ 8,
+ )
+
+ base_log_folder = conf.get("logging", "base_log_folder")
+
+ return init_log_folder(
+ base_log_folder,
+ new_folder_permissions=new_folder_permissions,
+ )
+
return logging_class_path
diff --git a/shared/logging/src/airflow_shared/logging/__init__.py
b/shared/logging/src/airflow_shared/logging/__init__.py
index 8f1cd395396..274c404a5c4 100644
--- a/shared/logging/src/airflow_shared/logging/__init__.py
+++ b/shared/logging/src/airflow_shared/logging/__init__.py
@@ -16,4 +16,10 @@
# under the License.
from __future__ import annotations
-from .structlog import configure_logging as configure_logging
+__all__ = [
+ "configure_logging",
+ "init_log_file",
+ "init_log_folder",
+]
+
+from .structlog import configure_logging, init_log_file, init_log_folder
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py
b/shared/logging/src/airflow_shared/logging/structlog.py
index 8510520474a..6e66c50ab60 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -25,6 +25,7 @@ import re
import sys
from collections.abc import Callable, Mapping, Sequence
from functools import cache, cached_property, partial
+from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
import pygtrie
@@ -505,6 +506,62 @@ def configure_logging(
logging.config.dictConfig(config)
+def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions:
int):
+ """
+ Prepare the log folder and ensure its mode is as configured.
+
+ To handle log writing when tasks are impersonated, the log files need to
+ be writable by the user that runs the Airflow command and the user
+ that is impersonated. This is mainly to handle corner cases with the
+ SubDagOperator. When the SubDagOperator is run, all of the operators
+ run under the impersonated user and create appropriate log files
+ as the impersonated user. However, if the user manually runs tasks
+ of the SubDagOperator through the UI, then the log files are created
+ by the user that runs the Airflow command. For example, the Airflow
+ run command may be run by the `airflow_sudoable` user, but the Airflow
+ tasks may be run by the `airflow` user. If the log files are not
+ writable by both users, then it's possible that re-running a task
+ via the UI (or vice versa) results in a permission error as the task
+ tries to write to a log file created by the other user.
+
+ We leave it up to the user to manage their permissions by exposing
configuration for both
+ new folders and new log files. Default is to make new log folders and
files group-writeable
+ to handle most common impersonation use cases. The requirement in this
case will be to make
+ sure that the same group is set as default group for both - impersonated
user and main airflow
+ user.
+ """
+ directory = Path(directory)
+ for parent in reversed(Path(directory).parents):
+ parent.mkdir(mode=new_folder_permissions, exist_ok=True)
+ directory.mkdir(mode=new_folder_permissions, exist_ok=True)
+
+
+def init_log_file(
+ base_log_folder: str | os.PathLike[str],
+ local_relative_path: str | os.PathLike[str],
+ *,
+ new_folder_permissions: int = 0o775,
+ new_file_permissions: int = 0o664,
+) -> Path:
+ """
+ Ensure log file and parent directories are created with the correct
permissions.
+
+ Any directories that are missing are created with the right permission
bits.
+
+ See above ``init_log_folder`` method for more detailed explanation.
+ """
+ full_path = Path(base_log_folder, local_relative_path)
+ init_log_folder(full_path.parent, new_folder_permissions)
+
+ try:
+ full_path.touch(new_file_permissions)
+ except OSError as e:
+ log = structlog.get_logger(__name__)
+ log.warning("OSError while changing ownership of the log file. %s", e)
+
+ return full_path
+
+
if __name__ == "__main__":
configure_logging(
# json_output=True,
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index d57867a6ef9..28b51d08d6d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -222,46 +222,14 @@ def logger_at_level(name: str, level: int) -> Logger:
)
-def _prepare_log_folder(directory: Path, mode: int):
- """
- Prepare the log folder and ensure its mode is as configured.
-
- To handle log writing when tasks are impersonated, the log files need to
- be writable by the user that runs the Airflow command and the user
- that is impersonated. This is mainly to handle corner cases with the
- SubDagOperator. When the SubDagOperator is run, all of the operators
- run under the impersonated user and create appropriate log files
- as the impersonated user. However, if the user manually runs tasks
- of the SubDagOperator through the UI, then the log files are created
- by the user that runs the Airflow command. For example, the Airflow
- run command may be run by the `airflow_sudoable` user, but the Airflow
- tasks may be run by the `airflow` user. If the log files are not
- writable by both users, then it's possible that re-running a task
- via the UI (or vice versa) results in a permission error as the task
- tries to write to a log file created by the other user.
-
- We leave it up to the user to manage their permissions by exposing
configuration for both
- new folders and new log files. Default is to make new log folders and
files group-writeable
- to handle most common impersonation use cases. The requirement in this
case will be to make
- sure that the same group is set as default group for both - impersonated
user and main airflow
- user.
- """
- for parent in reversed(directory.parents):
- parent.mkdir(mode=mode, exist_ok=True)
- directory.mkdir(mode=mode, exist_ok=True)
-
-
def init_log_file(local_relative_path: str) -> Path:
"""
Ensure log file and parent directories are created.
Any directories that are missing are created with the right permission
bits.
-
- See above ``_prepare_log_folder`` method for more detailed explanation.
"""
- # NOTE: This is duplicated from
airflow.utils.log.file_task_handler:FileTaskHandler._init_file, but we
- # want to remove that
from airflow.configuration import conf
+ from airflow.sdk._shared.logging import init_log_file
new_file_permissions = int(
conf.get("logging", "file_task_handler_new_file_permissions",
fallback="0o664"),
@@ -273,17 +241,13 @@ def init_log_file(local_relative_path: str) -> Path:
)
base_log_folder = conf.get("logging", "base_log_folder")
- full_path = Path(base_log_folder, local_relative_path)
- _prepare_log_folder(full_path.parent, new_folder_permissions)
-
- try:
- full_path.touch(new_file_permissions)
- except OSError as e:
- log = structlog.get_logger(__name__)
- log.warning("OSError while changing ownership of the log file. %s", e)
-
- return full_path
+ return init_log_file(
+ base_log_folder,
+ local_relative_path,
+ new_folder_permissions=new_folder_permissions,
+ new_file_permissions=new_file_permissions,
+ )
def load_remote_log_handler() -> RemoteLogIO | None: