This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 3b93650a4f6 Add bundle root to sys.path in dag processor (#50385) 3b93650a4f6 is described below commit 3b93650a4f6fd2c7607bfdc75f8a18c5c99a5d40 Author: Tzu-ping Chung <uranu...@gmail.com> AuthorDate: Tue May 13 03:03:10 2025 +0800 Add bundle root to sys.path in dag processor (#50385) --- .../src/airflow/dag_processing/processor.py | 7 +++-- airflow-core/src/airflow/settings.py | 7 ----- airflow-core/tests/unit/core/test_settings.py | 15 ++-------- .../tests/unit/dag_processing/test_processor.py | 34 +++++++++++++++++++--- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 73d2c23c7f5..79b3a9816c2 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -67,8 +67,6 @@ ToDagProcessor = Annotated[ def _parse_file_entrypoint(): - import os - import structlog from airflow.sdk.execution_time import task_runner @@ -87,6 +85,11 @@ def _parse_file_entrypoint(): task_runner.SUPERVISOR_COMMS = comms_decoder log = structlog.get_logger(logger_name="task") + # Put bundle root on sys.path if needed. This allows the dag bundle to add + # code in util modules to be shared between files within the same bundle. + if (bundle_root := os.fspath(msg.bundle_path)) not in sys.path: + sys.path.append(bundle_root) + result = _parse_file(msg, log) if result is not None: comms_decoder.send_request(log, result) diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index 076aab43e27..c137c80b56f 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -563,12 +563,6 @@ def prepare_syspath_for_config_and_plugins(): sys.path.append(PLUGINS_FOLDER) -def prepare_syspath_for_dags_folder(): - """Update sys.path to include the DAGs folder.""" - if DAGS_FOLDER not in sys.path: - sys.path.append(DAGS_FOLDER) - - def import_local_settings(): """Import airflow_local_settings.py files to allow overriding any configs in settings.py file.""" try: @@ -615,7 +609,6 @@ def initialize(): # in airflow_local_settings to take precendec load_policy_plugins(POLICY_PLUGIN_MANAGER) import_local_settings() - prepare_syspath_for_dags_folder() global LOGGING_CLASS_PATH LOGGING_CLASS_PATH = configure_logging() diff --git a/airflow-core/tests/unit/core/test_settings.py b/airflow-core/tests/unit/core/test_settings.py index 70fdbf9c728..c701588268f 100644 --- a/airflow-core/tests/unit/core/test_settings.py +++ b/airflow-core/tests/unit/core/test_settings.py @@ -91,16 +91,9 @@ class TestLocalSettings: @mock.patch("airflow.settings.prepare_syspath_for_config_and_plugins") @mock.patch("airflow.settings.import_local_settings") - @mock.patch("airflow.settings.prepare_syspath_for_dags_folder") - def test_initialize_order( - self, - mock_prepare_syspath_for_dags_folder, - mock_import_local_settings, - mock_prepare_syspath_for_config_and_plugins, - ): + def test_initialize_order(self, mock_import_local_settings, mock_prepare_syspath_for_config_and_plugins): """ - Tests that import_local_settings is called between prepare_syspath_for_config_and_plugins - and prepare_syspath_for_dags_folder + Tests that import_local_settings is called after prepare_syspath_for_config_and_plugins """ mock_local_settings = mock.Mock() @@ -108,9 +101,6 @@ class TestLocalSettings: mock_prepare_syspath_for_config_and_plugins, "prepare_syspath_for_config_and_plugins" ) mock_local_settings.attach_mock(mock_import_local_settings, "import_local_settings") - mock_local_settings.attach_mock( - mock_prepare_syspath_for_dags_folder, "prepare_syspath_for_dags_folder" - ) import airflow.settings @@ -119,7 +109,6 @@ class TestLocalSettings: expected_calls = [ call.prepare_syspath_for_config_and_plugins(), call.import_local_settings(), - call.prepare_syspath_for_dags_folder(), ] mock_local_settings.assert_has_calls(expected_calls) diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index a0349e0c04a..6a3eb978067 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -129,7 +129,6 @@ class TestDagFileProcessor: assert resp.import_errors is not None assert "a.py" in resp.import_errors - # @pytest.mark.execution_timeout(10) def test_top_level_variable_access( self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ): @@ -271,9 +270,7 @@ class TestDagFileProcessor: assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "test_my_conn" - def test_top_level_connection_access_not_found( - self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch - ): + def test_top_level_connection_access_not_found(self, tmp_path: pathlib.Path): logger_filehandle = MagicMock() def dag_in_a_fn(): @@ -297,6 +294,35 @@ class TestDagFileProcessor: if result.import_errors: assert "CONNECTION_NOT_FOUND" in next(iter(result.import_errors.values())) + def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path): + tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'") + + dag1_path = tmp_path.joinpath("dag1.py") + dag1_code = """ + from util import NAME + + from airflow.sdk import DAG + + with DAG(NAME): + pass + """ + dag1_path.write_text(textwrap.dedent(dag1_code)) + + proc = DagFileProcessorProcess.start( + id=1, + path=dag1_path, + bundle_path=tmp_path, + callbacks=[], + logger_filehandle=MagicMock(), + ) + while not proc.is_ready: + proc._service_subprocess(0.1) + + result = proc.parsing_result + assert result is not None + assert result.import_errors == {} + assert result.serialized_dags[0].dag_id == "dag_name" + def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> pathlib.Path: # Create the dag in a fn, and use inspect.getsource to write it to a file so that