jedcunningham commented on code in PR #55894:
URL: https://github.com/apache/airflow/pull/55894#discussion_r2365619889
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
+ def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
Review Comment:
```suggestion
from airflow.sdk import DAG
```
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -398,12 +398,23 @@ def handler(signum, frame):
def parse(mod_name, filepath):
try:
- loader = importlib.machinery.SourceFileLoader(mod_name,
filepath)
- spec = importlib.util.spec_from_loader(mod_name, loader)
- new_module = importlib.util.module_from_spec(spec)
- sys.modules[spec.name] = new_module
- loader.exec_module(new_module)
- return [new_module]
+ # Add bundle path to sys.path if we have one
+ bundle_path_added = False
+ if self.bundle_path and str(self.bundle_path) not in sys.path:
+ sys.path.insert(0, str(self.bundle_path))
Review Comment:
Why not on the back with append?
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
+ def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import sys
+ import bundle_util
+
+ with DAG('test_import', description=f"DAG with sys.path:
{sys.path}"):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ assert str(tmp_path) not in sys.path # bundle path is not in sys.path
before bundle parse
+ dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path,
include_examples=False)
+
+ # Check import was successful
+ assert len(dagbag.dags) == 1
+ assert not dagbag.import_errors
+
+ dag = dagbag.get_dag("test_import")
+ assert dag is not None
+ assert str(tmp_path) in dag.description # sys.path is enhanced during
parse
+
+ assert str(tmp_path) not in sys.path # bundle path is not preserved
in sys.path
+ assert sys.path == syspath_before # sys.path doesn't change
+
+ def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import bundle_utils # typo
+
+ with DAG('test_import'):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path,
include_examples=False)
+
+ assert dagbag.import_errors # Check import failed
+ assert sys.path == syspath_before # sys.path doesn't change
+
+ def test_bundle_path_none_no_syspath_manipulation(self, tmp_path, caplog):
+ """
+ Test that no sys.path manipulation occurs when bundle_path is None.
+ """
+ dag_file = tmp_path / "simple_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from datetime import datetime
+ from airflow.sdk import DAG, BaseOperator
Review Comment:
```suggestion
from airflow.sdk import DAG
```
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
+ def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import sys
+ import bundle_util
+
+ with DAG('test_import', description=f"DAG with sys.path:
{sys.path}"):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
Review Comment:
```suggestion
....
```
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
+ def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import sys
+ import bundle_util
+
+ with DAG('test_import', description=f"DAG with sys.path:
{sys.path}"):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ assert str(tmp_path) not in sys.path # bundle path is not in sys.path
before bundle parse
+ dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path,
include_examples=False)
+
+ # Check import was successful
+ assert len(dagbag.dags) == 1
+ assert not dagbag.import_errors
+
+ dag = dagbag.get_dag("test_import")
+ assert dag is not None
+ assert str(tmp_path) in dag.description # sys.path is enhanced during
parse
+
+ assert str(tmp_path) not in sys.path # bundle path is not preserved
in sys.path
+ assert sys.path == syspath_before # sys.path doesn't change
+
+ def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import bundle_utils # typo
+
+ with DAG('test_import'):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
Review Comment:
```suggestion
...
```
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
+ def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import sys
+ import bundle_util
+
+ with DAG('test_import', description=f"DAG with sys.path:
{sys.path}"):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ assert str(tmp_path) not in sys.path # bundle path is not in sys.path
before bundle parse
+ dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path,
include_examples=False)
+
+ # Check import was successful
+ assert len(dagbag.dags) == 1
+ assert not dagbag.import_errors
+
+ dag = dagbag.get_dag("test_import")
+ assert dag is not None
+ assert str(tmp_path) in dag.description # sys.path is enhanced during
parse
+
+ assert str(tmp_path) not in sys.path # bundle path is not preserved
in sys.path
+ assert sys.path == syspath_before # sys.path doesn't change
+
+ def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import bundle_utils # typo
+
+ with DAG('test_import'):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path,
include_examples=False)
+
+ assert dagbag.import_errors # Check import failed
+ assert sys.path == syspath_before # sys.path doesn't change
+
+ def test_bundle_path_none_no_syspath_manipulation(self, tmp_path, caplog):
+ """
+ Test that no sys.path manipulation occurs when bundle_path is None.
+ """
+ dag_file = tmp_path / "simple_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from datetime import datetime
+ from airflow.sdk import DAG, BaseOperator
+ import sys
+
+ dag = DAG("simple_dag", start_date=datetime(2021, 1, 1),
schedule=None, description=f"DAG with sys.path: {sys.path}")
+ t1 = BaseOperator(task_id="test_task", dag=dag)
Review Comment:
```suggestion
with DAG("simple_dag", description=f"DAG with sys.path:
{sys.path}"):
...
```
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert "SIGSEGV signal handler registration failed. Not in the
main thread" in caplog.text
+ def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
+ import sys
+ import bundle_util
+
+ with DAG('test_import', description=f"DAG with sys.path:
{sys.path}"):
+ @task
+ def mytask():
+ print(util.get_message())
+ mytask()
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ assert str(tmp_path) not in sys.path # bundle path is not in sys.path
before bundle parse
+ dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path,
include_examples=False)
+
+ # Check import was successful
+ assert len(dagbag.dags) == 1
+ assert not dagbag.import_errors
+
+ dag = dagbag.get_dag("test_import")
+ assert dag is not None
+ assert str(tmp_path) in dag.description # sys.path is enhanced during
parse
+
+ assert str(tmp_path) not in sys.path # bundle path is not preserved
in sys.path
+ assert sys.path == syspath_before # sys.path doesn't change
+
+ def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+ """
+ Test that bundle_path is correctly added to and removed from sys.path
during DAG import.
+ """
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow import DAG
+ from airflow.decorators import task
Review Comment:
```suggestion
from airflow.sdk import DAG
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]