This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c7565caf889 Set up dag file parsing logs with structlog (#45888)
c7565caf889 is described below

commit c7565caf88947f3033d1d11d2524ae1ebdc146f0
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Wed Jan 22 16:08:33 2025 -0700

    Set up dag file parsing logs with structlog (#45888)
    
    This gets the dag file parsing logs working with structlog. This will be
    further refactored, particularly to make it configurable for end users.
    But in the meantime, this gets the logs being written again.
---
 airflow/dag_processing/manager.py    | 56 ++++++++++++++++++++++++++++++++++++
 tests/dag_processing/test_manager.py |  3 +-
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 32c141d2b71..fd63f51a0d2 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -38,6 +38,7 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Any, NamedTuple
 
 import attrs
+import structlog
 from sqlalchemy import delete, select, tuple_, update
 from tabulate import tabulate
 from uuid6 import uuid7
@@ -52,6 +53,7 @@ from airflow.models.dagbundle import DagBundleModel
 from airflow.models.dagwarning import DagWarning
 from airflow.models.db_callback_request import DbCallbackRequest
 from airflow.models.errors import ParseImportError
+from airflow.sdk.log import init_log_file, logging_processors
 from airflow.secrets.cache import SecretCache
 from airflow.stats import Stats
 from airflow.traces.tracer import Trace
@@ -109,6 +111,10 @@ def _config_bool_factory(section: str, key: str):
     return functools.partial(conf.getboolean, section, key)
 
 
+def _config_get_factory(section: str, key: str):
+    return functools.partial(conf.get, section, key)
+
+
 def _resolve_path(instance: Any, attribute: attrs.Attribute, val: str | 
os.PathLike[str] | None):
     if val is not None:
         val = Path(val).resolve()
@@ -179,6 +185,9 @@ class DagFileProcessorManager:
         factory=_config_int_factory("scheduler", "max_callbacks_per_loop")
     )
 
+    base_log_dir: str = attrs.field(factory=_config_get_factory("scheduler", 
"CHILD_PROCESS_LOG_DIRECTORY"))
+    _latest_log_symlink_date: datetime = attrs.field(factory=datetime.today, 
init=False)
+
     def register_exit_signals(self):
         """Register signals that stop child processes."""
         signal.signal(signal.SIGINT, self._exit_gracefully)
@@ -216,6 +225,7 @@ class DagFileProcessorManager:
 
         self.log.info("Getting all DAG bundles")
         self._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
+        self._symlink_latest_log_directory()
 
         return self._run_parsing_loop()
 
@@ -694,6 +704,51 @@ class DagFileProcessorManager:
         for dag_file in finished:
             self._processors.pop(dag_file)
 
+    def _get_log_dir(self) -> str:
+        return os.path.join(self.base_log_dir, 
timezone.utcnow().strftime("%Y-%m-%d"))
+
+    def _symlink_latest_log_directory(self):
+        """
+        Create symbolic link to the current day's log directory.
+
+        Allows easy access to the latest parsing log files.
+        """
+        log_directory = self._get_log_dir()
+        latest_log_directory_path = os.path.join(self.base_log_dir, "latest")
+        if os.path.isdir(log_directory):
+            rel_link_target = 
Path(log_directory).relative_to(Path(latest_log_directory_path).parent)
+            try:
+                # if symlink exists but is stale, update it
+                if os.path.islink(latest_log_directory_path):
+                    if os.path.realpath(latest_log_directory_path) != 
log_directory:
+                        os.unlink(latest_log_directory_path)
+                        os.symlink(rel_link_target, latest_log_directory_path)
+                elif os.path.isdir(latest_log_directory_path) or 
os.path.isfile(latest_log_directory_path):
+                    self.log.warning(
+                        "%s already exists as a dir/file. Skip creating 
symlink.", latest_log_directory_path
+                    )
+                else:
+                    os.symlink(rel_link_target, latest_log_directory_path)
+            except OSError:
+                self.log.warning("OSError while attempting to symlink the 
latest log directory")
+
+    def _render_log_filename(self, dag_file: DagFileInfo) -> str:
+        """Return an absolute path of where to log for a given dagfile."""
+        if self._latest_log_symlink_date < datetime.today():
+            self._symlink_latest_log_directory()
+            self._latest_log_symlink_date = datetime.today()
+
+        bundle = next(b for b in self._dag_bundles if b.name == 
dag_file.bundle_name)
+        relative_path = Path(dag_file.path).relative_to(bundle.path)
+        return os.path.join(self._get_log_dir(), bundle.name, 
f"{relative_path}.log")
+
+    def _get_logger_for_dag_file(self, dag_file: DagFileInfo):
+        log_filename = self._render_log_filename(dag_file)
+        log_file = init_log_file(log_filename)
+        underlying_logger = structlog.BytesLogger(log_file.open("ab"))
+        processors = logging_processors(enable_pretty_log=False)[0]
+        return structlog.wrap_logger(underlying_logger, processors=processors, 
logger_name="processor").bind()
+
     def _create_process(self, dag_file: DagFileInfo) -> 
DagFileProcessorProcess:
         id = uuid7()
 
@@ -705,6 +760,7 @@ class DagFileProcessorManager:
             path=dag_file.path,
             callbacks=callback_to_execute_for_file,
             selector=self.selector,
+            logger=self._get_logger_for_dag_file(dag_file),
         )
 
     def _start_new_processes(self):
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index ea15fc5cbf5..a1915899203 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -172,7 +172,8 @@ class TestDagFileProcessorManager:
         # Mock that only one processor exists. This processor runs with 
'file_1'
         manager._processors[file_1] = MagicMock()
         # Start New Processes
-        manager._start_new_processes()
+        with mock.patch.object(DagFileProcessorManager, "_create_process"):
+            manager._start_new_processes()
 
         # Because of the config: '[scheduler] parsing_processes = 2'
         # verify that only one extra process is created

Reply via email to