This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new b4c84168c2a [v3-0-test] Add bundle root to sys.path in dag processor 
(#50385) (#50509)
b4c84168c2a is described below

commit b4c84168c2a0bdb53e8d8f4f6bccf3a53791aa70
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 12 13:37:15 2025 -0600

    [v3-0-test] Add bundle root to sys.path in dag processor (#50385) (#50509)
    
    (cherry picked from commit 3b93650a4f6fd2c7607bfdc75f8a18c5c99a5d40)
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 .../src/airflow/dag_processing/processor.py        |  7 +++--
 airflow-core/src/airflow/settings.py               |  7 -----
 airflow-core/tests/unit/core/test_settings.py      | 15 ++--------
 .../tests/unit/dag_processing/test_processor.py    | 34 +++++++++++++++++++---
 4 files changed, 37 insertions(+), 26 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index 73d2c23c7f5..79b3a9816c2 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -67,8 +67,6 @@ ToDagProcessor = Annotated[
 
 
 def _parse_file_entrypoint():
-    import os
-
     import structlog
 
     from airflow.sdk.execution_time import task_runner
@@ -87,6 +85,11 @@ def _parse_file_entrypoint():
     task_runner.SUPERVISOR_COMMS = comms_decoder
     log = structlog.get_logger(logger_name="task")
 
+    # Put bundle root on sys.path if needed. This allows the dag bundle to add
+    # code in util modules to be shared between files within the same bundle.
+    if (bundle_root := os.fspath(msg.bundle_path)) not in sys.path:
+        sys.path.append(bundle_root)
+
     result = _parse_file(msg, log)
     if result is not None:
         comms_decoder.send_request(log, result)
diff --git a/airflow-core/src/airflow/settings.py 
b/airflow-core/src/airflow/settings.py
index ce36f1e676a..0056b93497a 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -560,12 +560,6 @@ def prepare_syspath_for_config_and_plugins():
         sys.path.append(PLUGINS_FOLDER)
 
 
-def prepare_syspath_for_dags_folder():
-    """Update sys.path to include the DAGs folder."""
-    if DAGS_FOLDER not in sys.path:
-        sys.path.append(DAGS_FOLDER)
-
-
 def import_local_settings():
     """Import airflow_local_settings.py files to allow overriding any configs 
in settings.py file."""
     try:
@@ -612,7 +606,6 @@ def initialize():
     # in airflow_local_settings to take precendec
     load_policy_plugins(POLICY_PLUGIN_MANAGER)
     import_local_settings()
-    prepare_syspath_for_dags_folder()
     global LOGGING_CLASS_PATH
     LOGGING_CLASS_PATH = configure_logging()
 
diff --git a/airflow-core/tests/unit/core/test_settings.py 
b/airflow-core/tests/unit/core/test_settings.py
index 70fdbf9c728..c701588268f 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -91,16 +91,9 @@ class TestLocalSettings:
 
     @mock.patch("airflow.settings.prepare_syspath_for_config_and_plugins")
     @mock.patch("airflow.settings.import_local_settings")
-    @mock.patch("airflow.settings.prepare_syspath_for_dags_folder")
-    def test_initialize_order(
-        self,
-        mock_prepare_syspath_for_dags_folder,
-        mock_import_local_settings,
-        mock_prepare_syspath_for_config_and_plugins,
-    ):
+    def test_initialize_order(self, mock_import_local_settings, 
mock_prepare_syspath_for_config_and_plugins):
         """
-        Tests that import_local_settings is called between 
prepare_syspath_for_config_and_plugins
-        and prepare_syspath_for_dags_folder
+        Tests that import_local_settings is called after 
prepare_syspath_for_config_and_plugins
         """
         mock_local_settings = mock.Mock()
 
@@ -108,9 +101,6 @@ class TestLocalSettings:
             mock_prepare_syspath_for_config_and_plugins, 
"prepare_syspath_for_config_and_plugins"
         )
         mock_local_settings.attach_mock(mock_import_local_settings, 
"import_local_settings")
-        mock_local_settings.attach_mock(
-            mock_prepare_syspath_for_dags_folder, 
"prepare_syspath_for_dags_folder"
-        )
 
         import airflow.settings
 
@@ -119,7 +109,6 @@ class TestLocalSettings:
         expected_calls = [
             call.prepare_syspath_for_config_and_plugins(),
             call.import_local_settings(),
-            call.prepare_syspath_for_dags_folder(),
         ]
 
         mock_local_settings.assert_has_calls(expected_calls)
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index a0349e0c04a..6a3eb978067 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -129,7 +129,6 @@ class TestDagFileProcessor:
         assert resp.import_errors is not None
         assert "a.py" in resp.import_errors
 
-    # @pytest.mark.execution_timeout(10)
     def test_top_level_variable_access(
         self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: 
pytest.MonkeyPatch
     ):
@@ -271,9 +270,7 @@ class TestDagFileProcessor:
         assert result.import_errors == {}
         assert result.serialized_dags[0].dag_id == "test_my_conn"
 
-    def test_top_level_connection_access_not_found(
-        self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
-    ):
+    def test_top_level_connection_access_not_found(self, tmp_path: 
pathlib.Path):
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -297,6 +294,35 @@ class TestDagFileProcessor:
         if result.import_errors:
             assert "CONNECTION_NOT_FOUND" in 
next(iter(result.import_errors.values()))
 
+    def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path):
+        tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
+
+        dag1_path = tmp_path.joinpath("dag1.py")
+        dag1_code = """
+        from util import NAME
+
+        from airflow.sdk import DAG
+
+        with DAG(NAME):
+            pass
+        """
+        dag1_path.write_text(textwrap.dedent(dag1_code))
+
+        proc = DagFileProcessorProcess.start(
+            id=1,
+            path=dag1_path,
+            bundle_path=tmp_path,
+            callbacks=[],
+            logger_filehandle=MagicMock(),
+        )
+        while not proc.is_ready:
+            proc._service_subprocess(0.1)
+
+        result = proc.parsing_result
+        assert result is not None
+        assert result.import_errors == {}
+        assert result.serialized_dags[0].dag_id == "dag_name"
+
 
 def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> 
pathlib.Path:
     # Create the dag in a fn, and use inspect.getsource to write it to a file 
so that

Reply via email to