This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c7565caf889 Set up dag file parsing logs with structlog (#45888) c7565caf889 is described below commit c7565caf88947f3033d1d11d2524ae1ebdc146f0 Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Wed Jan 22 16:08:33 2025 -0700 Set up dag file parsing logs with structlog (#45888) This gets the dag file parsing logs working with structlog. This will be further refactored, particularly to make it configurable for end users. But in the meantime, this gets the logs being written again. --- airflow/dag_processing/manager.py | 56 ++++++++++++++++++++++++++++++++++++ tests/dag_processing/test_manager.py | 3 +- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 32c141d2b71..fd63f51a0d2 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -38,6 +38,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, NamedTuple import attrs +import structlog from sqlalchemy import delete, select, tuple_, update from tabulate import tabulate from uuid6 import uuid7 @@ -52,6 +53,7 @@ from airflow.models.dagbundle import DagBundleModel from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ParseImportError +from airflow.sdk.log import init_log_file, logging_processors from airflow.secrets.cache import SecretCache from airflow.stats import Stats from airflow.traces.tracer import Trace @@ -109,6 +111,10 @@ def _config_bool_factory(section: str, key: str): return functools.partial(conf.getboolean, section, key) +def _config_get_factory(section: str, key: str): + return functools.partial(conf.get, section, key) + + def _resolve_path(instance: Any, attribute: attrs.Attribute, val: str | os.PathLike[str] | None): if val is not None: val = Path(val).resolve() @@ -179,6 +185,9 @@ class DagFileProcessorManager: factory=_config_int_factory("scheduler", "max_callbacks_per_loop") ) + base_log_dir: str = attrs.field(factory=_config_get_factory("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")) + _latest_log_symlink_date: datetime = attrs.field(factory=datetime.today, init=False) + def register_exit_signals(self): """Register signals that stop child processes.""" signal.signal(signal.SIGINT, self._exit_gracefully) @@ -216,6 +225,7 @@ class DagFileProcessorManager: self.log.info("Getting all DAG bundles") self._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) + self._symlink_latest_log_directory() return self._run_parsing_loop() @@ -694,6 +704,51 @@ class DagFileProcessorManager: for dag_file in finished: self._processors.pop(dag_file) + def _get_log_dir(self) -> str: + return os.path.join(self.base_log_dir, timezone.utcnow().strftime("%Y-%m-%d")) + + def _symlink_latest_log_directory(self): + """ + Create symbolic link to the current day's log directory. + + Allows easy access to the latest parsing log files. + """ + log_directory = self._get_log_dir() + latest_log_directory_path = os.path.join(self.base_log_dir, "latest") + if os.path.isdir(log_directory): + rel_link_target = Path(log_directory).relative_to(Path(latest_log_directory_path).parent) + try: + # if symlink exists but is stale, update it + if os.path.islink(latest_log_directory_path): + if os.path.realpath(latest_log_directory_path) != log_directory: + os.unlink(latest_log_directory_path) + os.symlink(rel_link_target, latest_log_directory_path) + elif os.path.isdir(latest_log_directory_path) or os.path.isfile(latest_log_directory_path): + self.log.warning( + "%s already exists as a dir/file. Skip creating symlink.", latest_log_directory_path + ) + else: + os.symlink(rel_link_target, latest_log_directory_path) + except OSError: + self.log.warning("OSError while attempting to symlink the latest log directory") + + def _render_log_filename(self, dag_file: DagFileInfo) -> str: + """Return an absolute path of where to log for a given dagfile.""" + if self._latest_log_symlink_date < datetime.today(): + self._symlink_latest_log_directory() + self._latest_log_symlink_date = datetime.today() + + bundle = next(b for b in self._dag_bundles if b.name == dag_file.bundle_name) + relative_path = Path(dag_file.path).relative_to(bundle.path) + return os.path.join(self._get_log_dir(), bundle.name, f"{relative_path}.log") + + def _get_logger_for_dag_file(self, dag_file: DagFileInfo): + log_filename = self._render_log_filename(dag_file) + log_file = init_log_file(log_filename) + underlying_logger = structlog.BytesLogger(log_file.open("ab")) + processors = logging_processors(enable_pretty_log=False)[0] + return structlog.wrap_logger(underlying_logger, processors=processors, logger_name="processor").bind() + def _create_process(self, dag_file: DagFileInfo) -> DagFileProcessorProcess: id = uuid7() @@ -705,6 +760,7 @@ class DagFileProcessorManager: path=dag_file.path, callbacks=callback_to_execute_for_file, selector=self.selector, + logger=self._get_logger_for_dag_file(dag_file), ) def _start_new_processes(self): diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index ea15fc5cbf5..a1915899203 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -172,7 +172,8 @@ class TestDagFileProcessorManager: # Mock that only one processor exists. This processor runs with 'file_1' manager._processors[file_1] = MagicMock() # Start New Processes - manager._start_new_processes() + with mock.patch.object(DagFileProcessorManager, "_create_process"): + manager._start_new_processes() # Because of the config: '[scheduler] parsing_processes = 2' # verify that only one extra process is created