jedcunningham commented on code in PR #55894:
URL: https://github.com/apache/airflow/pull/55894#discussion_r2376039076


##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,95 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow.sdk import DAG
+                from airflow.operators.empty import EmptyOperator
+
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    EmptyOperator(task_id="mytask")
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        assert str(tmp_path) not in sys.path  # bundle path is not in sys.path 
before bundle parse
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        # Check import was successful
+        assert len(dagbag.dags) == 1
+        assert not dagbag.import_errors
+
+        dag = dagbag.get_dag("test_import")
+        assert dag is not None
+        assert str(tmp_path) in dag.description  # sys.path is enhanced during 
parse
+
+        assert str(tmp_path) not in sys.path  # bundle path is not preserved 
in sys.path
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow.sdk import DAG
+                from airflow.operators.empty import EmptyOperator

Review Comment:
   ```suggestion
                   from airflow.providers.standard.operators.empty import 
EmptyOperator
   ```



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,95 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow.sdk import DAG
+                from airflow.operators.empty import EmptyOperator
+
+                import sys
+                import bundle_util
+
+                with DAG('test_import', description=f"DAG with sys.path: 
{sys.path}"):
+                    EmptyOperator(task_id="mytask")
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        assert str(tmp_path) not in sys.path  # bundle path is not in sys.path 
before bundle parse
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        # Check import was successful
+        assert len(dagbag.dags) == 1
+        assert not dagbag.import_errors
+
+        dag = dagbag.get_dag("test_import")
+        assert dag is not None
+        assert str(tmp_path) in dag.description  # sys.path is enhanced during 
parse
+
+        assert str(tmp_path) not in sys.path  # bundle path is not preserved 
in sys.path
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_syspath_is_cleanup_on_failure(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow.sdk import DAG
+                from airflow.operators.empty import EmptyOperator
+
+                import bundle_utils # typo
+
+                with DAG('test_import'):
+                    EmptyOperator(task_id="mytask")
+                """
+            )
+        )
+        syspath_before = deepcopy(sys.path)
+        dagbag = DagBag(dag_folder=str(dag_file), bundle_path=tmp_path, 
include_examples=False)
+
+        assert dagbag.import_errors  # Check import failed
+        assert sys.path == syspath_before  # sys.path doesn't change
+
+    def test_bundle_path_none_no_syspath_manipulation(self, tmp_path, caplog):
+        """
+        Test that no sys.path manipulation occurs when bundle_path is None.
+        """
+        dag_file = tmp_path / "simple_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow.sdk import DAG
+                from airflow.operators.empty import EmptyOperator

Review Comment:
   ```suggestion
                   from airflow.providers.standard.operators.empty import 
EmptyOperator
   ```



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -866,6 +866,95 @@ def mytask():
             DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
             assert "SIGSEGV signal handler registration failed. Not in the 
main thread" in caplog.text
 
+    def test_bundle_path_is_in_syspath_during_parse(self, tmp_path):
+        """
+        Test that bundle_path is correctly added to and removed from sys.path 
during DAG import.
+        """
+        util_file = tmp_path / "bundle_util.py"
+        util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+        dag_file = tmp_path / "test_dag.py"
+        dag_file.write_text(
+            textwrap.dedent(
+                """\
+                from airflow.sdk import DAG
+                from airflow.operators.empty import EmptyOperator

Review Comment:
   ```suggestion
                   from airflow.providers.standard.operators.empty import 
EmptyOperator
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to