This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b93650a4f6 Add bundle root to sys.path in dag processor (#50385)
3b93650a4f6 is described below
commit 3b93650a4f6fd2c7607bfdc75f8a18c5c99a5d40
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue May 13 03:03:10 2025 +0800
Add bundle root to sys.path in dag processor (#50385)
---
.../src/airflow/dag_processing/processor.py | 7 +++--
airflow-core/src/airflow/settings.py | 7 -----
airflow-core/tests/unit/core/test_settings.py | 15 ++--------
.../tests/unit/dag_processing/test_processor.py | 34 +++++++++++++++++++---
4 files changed, 37 insertions(+), 26 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index 73d2c23c7f5..79b3a9816c2 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -67,8 +67,6 @@ ToDagProcessor = Annotated[
def _parse_file_entrypoint():
- import os
-
import structlog
from airflow.sdk.execution_time import task_runner
@@ -87,6 +85,11 @@ def _parse_file_entrypoint():
task_runner.SUPERVISOR_COMMS = comms_decoder
log = structlog.get_logger(logger_name="task")
+ # Put bundle root on sys.path if needed. This allows the dag bundle to add
+ # code in util modules to be shared between files within the same bundle.
+ if (bundle_root := os.fspath(msg.bundle_path)) not in sys.path:
+ sys.path.append(bundle_root)
+
result = _parse_file(msg, log)
if result is not None:
comms_decoder.send_request(log, result)
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index 076aab43e27..c137c80b56f 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -563,12 +563,6 @@ def prepare_syspath_for_config_and_plugins():
sys.path.append(PLUGINS_FOLDER)
-def prepare_syspath_for_dags_folder():
- """Update sys.path to include the DAGs folder."""
- if DAGS_FOLDER not in sys.path:
- sys.path.append(DAGS_FOLDER)
-
-
def import_local_settings():
"""Import airflow_local_settings.py files to allow overriding any configs
in settings.py file."""
try:
@@ -615,7 +609,6 @@ def initialize():
# in airflow_local_settings to take precendec
load_policy_plugins(POLICY_PLUGIN_MANAGER)
import_local_settings()
- prepare_syspath_for_dags_folder()
global LOGGING_CLASS_PATH
LOGGING_CLASS_PATH = configure_logging()
diff --git a/airflow-core/tests/unit/core/test_settings.py
b/airflow-core/tests/unit/core/test_settings.py
index 70fdbf9c728..c701588268f 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -91,16 +91,9 @@ class TestLocalSettings:
@mock.patch("airflow.settings.prepare_syspath_for_config_and_plugins")
@mock.patch("airflow.settings.import_local_settings")
- @mock.patch("airflow.settings.prepare_syspath_for_dags_folder")
- def test_initialize_order(
- self,
- mock_prepare_syspath_for_dags_folder,
- mock_import_local_settings,
- mock_prepare_syspath_for_config_and_plugins,
- ):
+ def test_initialize_order(self, mock_import_local_settings,
mock_prepare_syspath_for_config_and_plugins):
"""
- Tests that import_local_settings is called between
prepare_syspath_for_config_and_plugins
- and prepare_syspath_for_dags_folder
+ Tests that import_local_settings is called after
prepare_syspath_for_config_and_plugins
"""
mock_local_settings = mock.Mock()
@@ -108,9 +101,6 @@ class TestLocalSettings:
mock_prepare_syspath_for_config_and_plugins,
"prepare_syspath_for_config_and_plugins"
)
mock_local_settings.attach_mock(mock_import_local_settings,
"import_local_settings")
- mock_local_settings.attach_mock(
- mock_prepare_syspath_for_dags_folder,
"prepare_syspath_for_dags_folder"
- )
import airflow.settings
@@ -119,7 +109,6 @@ class TestLocalSettings:
expected_calls = [
call.prepare_syspath_for_config_and_plugins(),
call.import_local_settings(),
- call.prepare_syspath_for_dags_folder(),
]
mock_local_settings.assert_has_calls(expected_calls)
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index a0349e0c04a..6a3eb978067 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -129,7 +129,6 @@ class TestDagFileProcessor:
assert resp.import_errors is not None
assert "a.py" in resp.import_errors
- # @pytest.mark.execution_timeout(10)
def test_top_level_variable_access(
self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch:
pytest.MonkeyPatch
):
@@ -271,9 +270,7 @@ class TestDagFileProcessor:
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "test_my_conn"
- def test_top_level_connection_access_not_found(
- self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
- ):
+ def test_top_level_connection_access_not_found(self, tmp_path:
pathlib.Path):
logger_filehandle = MagicMock()
def dag_in_a_fn():
@@ -297,6 +294,35 @@ class TestDagFileProcessor:
if result.import_errors:
assert "CONNECTION_NOT_FOUND" in
next(iter(result.import_errors.values()))
+ def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path):
+ tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
+
+ dag1_path = tmp_path.joinpath("dag1.py")
+ dag1_code = """
+ from util import NAME
+
+ from airflow.sdk import DAG
+
+ with DAG(NAME):
+ pass
+ """
+ dag1_path.write_text(textwrap.dedent(dag1_code))
+
+ proc = DagFileProcessorProcess.start(
+ id=1,
+ path=dag1_path,
+ bundle_path=tmp_path,
+ callbacks=[],
+ logger_filehandle=MagicMock(),
+ )
+ while not proc.is_ready:
+ proc._service_subprocess(0.1)
+
+ result = proc.parsing_result
+ assert result is not None
+ assert result.import_errors == {}
+ assert result.serialized_dags[0].dag_id == "dag_name"
+
def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) ->
pathlib.Path:
# Create the dag in a fn, and use inspect.getsource to write it to a file
so that