This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c22c58d1aa9 Fixup to structlog migration - logging folder not being 
created correctly. (#55431)
c22c58d1aa9 is described below

commit c22c58d1aa9596b5bc084a6769e1474a3b1d8d3a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Sep 9 20:59:49 2025 +0100

    Fixup to structlog migration - logging folder not being created correctly. 
(#55431)
    
    This was broken in #52651 with our move away from FileTaskHandler, and 
hidden
    when running in Breeze due to the default logs folder already existing.
---
 airflow-core/src/airflow/logging_config.py         | 14 +++++-
 .../logging/src/airflow_shared/logging/__init__.py |  8 ++-
 .../src/airflow_shared/logging/structlog.py        | 57 ++++++++++++++++++++++
 task-sdk/src/airflow/sdk/log.py                    | 50 +++----------------
 4 files changed, 84 insertions(+), 45 deletions(-)

diff --git a/airflow-core/src/airflow/logging_config.py 
b/airflow-core/src/airflow/logging_config.py
index cce8b37f684..d4a77782c34 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -85,7 +85,7 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
 
 
 def configure_logging():
-    from airflow._shared.logging.structlog import configure_logging
+    from airflow._shared.logging import configure_logging, init_log_folder
 
     logging_config, logging_class_path = load_logging_config()
     try:
@@ -100,6 +100,18 @@ def configure_logging():
 
     validate_logging_config()
 
+    new_folder_permissions = int(
+        conf.get("logging", "file_task_handler_new_folder_permissions", 
fallback="0o775"),
+        8,
+    )
+
+    base_log_folder = conf.get("logging", "base_log_folder")
+
+    return init_log_folder(
+        base_log_folder,
+        new_folder_permissions=new_folder_permissions,
+    )
+
     return logging_class_path
 
 
diff --git a/shared/logging/src/airflow_shared/logging/__init__.py 
b/shared/logging/src/airflow_shared/logging/__init__.py
index 8f1cd395396..274c404a5c4 100644
--- a/shared/logging/src/airflow_shared/logging/__init__.py
+++ b/shared/logging/src/airflow_shared/logging/__init__.py
@@ -16,4 +16,10 @@
 # under the License.
 from __future__ import annotations
 
-from .structlog import configure_logging as configure_logging
+__all__ = [
+    "configure_logging",
+    "init_log_file",
+    "init_log_folder",
+]
+
+from .structlog import configure_logging, init_log_file, init_log_folder
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py 
b/shared/logging/src/airflow_shared/logging/structlog.py
index 8510520474a..6e66c50ab60 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -25,6 +25,7 @@ import re
 import sys
 from collections.abc import Callable, Mapping, Sequence
 from functools import cache, cached_property, partial
+from pathlib import Path
 from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
 
 import pygtrie
@@ -505,6 +506,62 @@ def configure_logging(
     logging.config.dictConfig(config)
 
 
+def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: 
int):
+    """
+    Prepare the log folder and ensure its mode is as configured.
+
+    To handle log writing when tasks are impersonated, the log files need to
+    be writable by the user that runs the Airflow command and the user
+    that is impersonated. This is mainly to handle corner cases with the
+    SubDagOperator. When the SubDagOperator is run, all of the operators
+    run under the impersonated user and create appropriate log files
+    as the impersonated user. However, if the user manually runs tasks
+    of the SubDagOperator through the UI, then the log files are created
+    by the user that runs the Airflow command. For example, the Airflow
+    run command may be run by the `airflow_sudoable` user, but the Airflow
+    tasks may be run by the `airflow` user. If the log files are not
+    writable by both users, then it's possible that re-running a task
+    via the UI (or vice versa) results in a permission error as the task
+    tries to write to a log file created by the other user.
+
+    We leave it up to the user to manage their permissions by exposing 
configuration for both
+    new folders and new log files. Default is to make new log folders and 
files group-writeable
+    to handle most common impersonation use cases. The requirement in this 
case will be to make
+    sure that the same group is set as default group for both - impersonated 
user and main airflow
+    user.
+    """
+    directory = Path(directory)
+    for parent in reversed(Path(directory).parents):
+        parent.mkdir(mode=new_folder_permissions, exist_ok=True)
+    directory.mkdir(mode=new_folder_permissions, exist_ok=True)
+
+
+def init_log_file(
+    base_log_folder: str | os.PathLike[str],
+    local_relative_path: str | os.PathLike[str],
+    *,
+    new_folder_permissions: int = 0o775,
+    new_file_permissions: int = 0o664,
+) -> Path:
+    """
+    Ensure log file and parent directories are created with the correct 
permissions.
+
+    Any directories that are missing are created with the right permission 
bits.
+
+    See above ``init_log_folder`` method for more detailed explanation.
+    """
+    full_path = Path(base_log_folder, local_relative_path)
+    init_log_folder(full_path.parent, new_folder_permissions)
+
+    try:
+        full_path.touch(new_file_permissions)
+    except OSError as e:
+        log = structlog.get_logger(__name__)
+        log.warning("OSError while changing ownership of the log file. %s", e)
+
+    return full_path
+
+
 if __name__ == "__main__":
     configure_logging(
         # json_output=True,
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index d57867a6ef9..28b51d08d6d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -222,46 +222,14 @@ def logger_at_level(name: str, level: int) -> Logger:
     )
 
 
-def _prepare_log_folder(directory: Path, mode: int):
-    """
-    Prepare the log folder and ensure its mode is as configured.
-
-    To handle log writing when tasks are impersonated, the log files need to
-    be writable by the user that runs the Airflow command and the user
-    that is impersonated. This is mainly to handle corner cases with the
-    SubDagOperator. When the SubDagOperator is run, all of the operators
-    run under the impersonated user and create appropriate log files
-    as the impersonated user. However, if the user manually runs tasks
-    of the SubDagOperator through the UI, then the log files are created
-    by the user that runs the Airflow command. For example, the Airflow
-    run command may be run by the `airflow_sudoable` user, but the Airflow
-    tasks may be run by the `airflow` user. If the log files are not
-    writable by both users, then it's possible that re-running a task
-    via the UI (or vice versa) results in a permission error as the task
-    tries to write to a log file created by the other user.
-
-    We leave it up to the user to manage their permissions by exposing 
configuration for both
-    new folders and new log files. Default is to make new log folders and 
files group-writeable
-    to handle most common impersonation use cases. The requirement in this 
case will be to make
-    sure that the same group is set as default group for both - impersonated 
user and main airflow
-    user.
-    """
-    for parent in reversed(directory.parents):
-        parent.mkdir(mode=mode, exist_ok=True)
-    directory.mkdir(mode=mode, exist_ok=True)
-
-
 def init_log_file(local_relative_path: str) -> Path:
     """
     Ensure log file and parent directories are created.
 
     Any directories that are missing are created with the right permission 
bits.
-
-    See above ``_prepare_log_folder`` method for more detailed explanation.
     """
-    # NOTE: This is duplicated from 
airflow.utils.log.file_task_handler:FileTaskHandler._init_file, but we
-    # want to remove that
     from airflow.configuration import conf
+    from airflow.sdk._shared.logging import init_log_file
 
     new_file_permissions = int(
         conf.get("logging", "file_task_handler_new_file_permissions", 
fallback="0o664"),
@@ -273,17 +241,13 @@ def init_log_file(local_relative_path: str) -> Path:
     )
 
     base_log_folder = conf.get("logging", "base_log_folder")
-    full_path = Path(base_log_folder, local_relative_path)
 
-    _prepare_log_folder(full_path.parent, new_folder_permissions)
-
-    try:
-        full_path.touch(new_file_permissions)
-    except OSError as e:
-        log = structlog.get_logger(__name__)
-        log.warning("OSError while changing ownership of the log file. %s", e)
-
-    return full_path
+    return init_log_file(
+        base_log_folder,
+        local_relative_path,
+        new_folder_permissions=new_folder_permissions,
+        new_file_permissions=new_file_permissions,
+    )
 
 
 def load_remote_log_handler() -> RemoteLogIO | None:

Reply via email to