This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f93fef8b8d1af74ac4a8553c7f6d41ef7fd87356 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Tue Sep 9 20:13:34 2025 +0100 Fix Dag processor crashing under `airflow standalone` (#55434) I'm not sure what `breeze start-airflow` wasn't exhibiting this and `airflow standalone` was, but the need to reconfigure stdlib logging has gone away with the move to struct log, so this is the best kind of fix -- deleteing code! --- .../airflow/cli/commands/dag_processor_command.py | 3 +-- airflow-core/src/airflow/dag_processing/manager.py | 26 ---------------------- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py b/airflow-core/src/airflow/cli/commands/dag_processor_command.py index c866763338f..9f054eb8467 100644 --- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py @@ -22,7 +22,7 @@ import logging from typing import Any from airflow.cli.commands.daemon_utils import run_command_with_daemon_option -from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing +from airflow.dag_processing.manager import DagFileProcessorManager from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner from airflow.jobs.job import Job, run_job from airflow.utils import cli as cli_utils @@ -50,7 +50,6 @@ def dag_processor(args): """Start Airflow Dag Processor Job.""" job_runner = _create_dag_processor_job_runner(args) - reload_configuration_for_dag_processing() run_command_with_daemon_option( args=args, process_name="dag-processor", diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 55cb41610b0..cf21c28f85e 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -21,7 +21,6 @@ from __future__ import annotations import contextlib import functools -import importlib import inspect import logging import os @@ -35,7 +34,6 @@ from collections import defaultdict, deque from collections.abc import Callable, Iterable, Iterator from dataclasses import dataclass, field from datetime import datetime, timedelta -from importlib import import_module from operator import attrgetter, itemgetter from pathlib import Path from typing import TYPE_CHECKING, Any, NamedTuple, cast @@ -47,7 +45,6 @@ from sqlalchemy.orm import load_only from tabulate import tabulate from uuid6 import uuid7 -import airflow.models from airflow._shared.timezones import timezone from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI from airflow.configuration import conf @@ -1102,29 +1099,6 @@ class DagFileProcessorManager(LoggingMixin): ) -def reload_configuration_for_dag_processing(): - # Reload configurations and settings to avoid collision with parent process. - # Because this process may need custom configurations that cannot be shared, - # e.g. RotatingFileHandler. And it can cause connection corruption if we - # do not recreate the SQLA connection pool. - os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True" - os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False" - # Replicating the behavior of how logging module was loaded - # in logging_config.py - # TODO: This reloading should be removed when we fix our logging behaviour - # In case of "spawn" method of starting processes for multiprocessing, reinitializing of the - # SQLAlchemy engine causes extremely unexpected behaviour of messing with objects already loaded - # in a parent process (likely via resources shared in memory by the ORM libraries). - # This caused flaky tests in our CI for many months and has been discovered while - # iterating on https://github.com/apache/airflow/pull/19860 - # The issue that describes the problem and possible remediation is - # at https://github.com/apache/airflow/issues/19934 - importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0])) - importlib.reload(airflow.settings) - airflow.settings.initialize() - del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] - - def process_parse_results( run_duration: float, finish_time: datetime,
