jedcunningham commented on code in PR #55894:
URL: https://github.com/apache/airflow/pull/55894#discussion_r2365619889


##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task

Review Comment:
   ```suggestion
                   from airflow.sdk import DAG
   ```



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -398,12 +398,23 @@ def handler(signum, frame):
 
         def parse(mod_name, filepath):
             try:
-                loader = importlib.machinery.SourceFileLoader(mod_name, 
filepath)
-                spec = importlib.util.spec_from_loader(mod_name, loader)
-                new_module = importlib.util.module_from_spec(spec)
-                sys.modules[spec.name] = new_module
-                loader.exec_module(new_module)
-                return [new_module]
+                # Add bundle path to sys.path if we have one
+                bundle_path_added = False
+                if self.bundle_path and str(self.bundle_path) not in sys.path:
+                    sys.path.insert(0, str(self.bundle_path))

Review Comment:
   Why not on the back with append?



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        assert str(tmp_path) not in sys.path  # bundle path is not in sys.path 
before bundle parse
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        # Check import was successful
+        assert len(dagbag.dags) == 1
+        assert not dagbag.import_errors
+
+        dag = dagbag.get_dag("test_import")
+        assert dag is not None
+        assert str(tmp_path) in dag.description  # sys.path is enhanced during 
parse
+
+        assert str(tmp_path) not in sys.path  # bundle path is not preserved 
in sys.path
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import bundle_utils # typo
+
+                with DAG('test_import'):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        assert dagbag.import_errors  # Check import failed
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_none_no_syspath_manipulation(self, tmp_path, caplog):
+        """
+        Test that no sys.path manipulation occurs when bundle_path is None.
+        """
+        dag_file = tmp_path / "simple_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+            from datetime import datetime
+            from airflow.sdk import DAG, BaseOperator

Review Comment:
   ```suggestion
               from airflow.sdk import DAG
   ```



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()

Review Comment:
   ```suggestion
                       ....
   ```



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        assert str(tmp_path) not in sys.path  # bundle path is not in sys.path 
before bundle parse
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        # Check import was successful
+        assert len(dagbag.dags) == 1
+        assert not dagbag.import_errors
+
+        dag = dagbag.get_dag("test_import")
+        assert dag is not None
+        assert str(tmp_path) in dag.description  # sys.path is enhanced during 
parse
+
+        assert str(tmp_path) not in sys.path  # bundle path is not preserved 
in sys.path
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import bundle_utils # typo
+
+                with DAG('test_import'):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()

Review Comment:
   ```suggestion
                       ...
   ```



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        assert str(tmp_path) not in sys.path  # bundle path is not in sys.path 
before bundle parse
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        # Check import was successful
+        assert len(dagbag.dags) == 1
+        assert not dagbag.import_errors
+
+        dag = dagbag.get_dag("test_import")
+        assert dag is not None
+        assert str(tmp_path) in dag.description  # sys.path is enhanced during 
parse
+
+        assert str(tmp_path) not in sys.path  # bundle path is not preserved 
in sys.path
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import bundle_utils # typo
+
+                with DAG('test_import'):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        assert dagbag.import_errors  # Check import failed
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_none_no_syspath_manipulation(self, tmp_path, caplog):
+        """
+        Test that no sys.path manipulation occurs when bundle_path is None.
+        """
+        dag_file = tmp_path / "simple_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+            from datetime import datetime
+            from airflow.sdk import DAG, BaseOperator
+            import sys
+
+            dag = DAG("simple_dag", start_date=datetime(2021, 1, 1), 
schedule=None, description=f"DAG with sys.path: {sys.path}")
+            t1 = BaseOperator(task_id="test_task", dag=dag)

Review Comment:
   ```suggestion
               with DAG("simple_dag", description=f"DAG with sys.path: 
{sys.path}"):
                   ...
   ```



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,98 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    @task
+                    def mytask():
+                        print(util.get_message())
+                    mytask()
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        assert str(tmp_path) not in sys.path  # bundle path is not in sys.path 
before bundle parse
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        # Check import was successful
+        assert len(dagbag.dags) == 1
+        assert not dagbag.import_errors
+
+        dag = dagbag.get_dag("test_import")
+        assert dag is not None
+        assert str(tmp_path) in dag.description  # sys.path is enhanced during 
parse
+
+        assert str(tmp_path) not in sys.path  # bundle path is not preserved 
in sys.path
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow import DAG
+                from airflow.decorators import task

Review Comment:
   ```suggestion
                   from airflow.sdk import DAG
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to