This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 407753c3369 Use relative path in ParseImportError.filename (#51406)
407753c3369 is described below

commit 407753c3369b690717d2f80fa483ecb5bb0aead7
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Thu Jun 12 12:43:59 2025 +0100

    Use relative path in ParseImportError.filename (#51406)
    
    * Use relative path in ParseImportError.filename
    
    This aligns with our use of relative path in DAGs
    
    * fixup! Use relative path in ParseImportError.filename
    
    * revert empty string default for relative_fileloc
    
    * Fix typo and query explanatory comment
    
    * Apply suggestions from code review
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 airflow-core/src/airflow/api/common/delete_dag.py  |  2 +-
 .../core_api/routes/public/import_error.py         | 14 +++-
 .../src/airflow/dag_processing/collection.py       | 86 ++++++++++++++--------
 airflow-core/src/airflow/dag_processing/manager.py |  9 ++-
 airflow-core/src/airflow/models/dagbag.py          | 38 +++++++---
 airflow-core/src/airflow/models/errors.py          |  8 +-
 .../core_api/routes/public/test_import_error.py    |  4 +-
 .../tests/unit/dag_processing/test_collection.py   | 23 ++++--
 8 files changed, 124 insertions(+), 60 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/delete_dag.py 
b/airflow-core/src/airflow/api/common/delete_dag.py
index 2f6076252ba..9953c9aace8 100644
--- a/airflow-core/src/airflow/api/common/delete_dag.py
+++ b/airflow-core/src/airflow/api/common/delete_dag.py
@@ -76,7 +76,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, 
session: Session =
     session.execute(
         delete(ParseImportError)
         .where(
-            ParseImportError.filename == dag.fileloc,
+            ParseImportError.filename == dag.relative_fileloc,
             ParseImportError.bundle_name == dag.bundle_name,
         )
         .execution_options(synchronize_session="fetch")
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
index 4beb0ea2cd4..feb475c91c3 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
@@ -151,15 +151,21 @@ def get_import_errors(
     # if the user doesn't have access to all DAGs, only display errors from 
visible DAGs
     readable_dag_ids = auth_manager.get_authorized_dag_ids(method="GET", 
user=user)
     # Build a cte that fetches dag_ids for each file location
-    visiable_files_cte = (
-        select(DagModel.fileloc, 
DagModel.dag_id).where(DagModel.dag_id.in_(readable_dag_ids)).cte()
+    visible_files_cte = (
+        select(DagModel.relative_fileloc, DagModel.dag_id, 
DagModel.bundle_name)
+        .where(DagModel.dag_id.in_(readable_dag_ids))
+        .cte()
     )
 
     # Prepare the import errors query by joining with the cte.
     # Each returned row will be a tuple: (ParseImportError, dag_id)
     import_errors_stmt = (
-        select(ParseImportError, visiable_files_cte.c.dag_id)
-        .join(visiable_files_cte, ParseImportError.filename == 
visiable_files_cte.c.fileloc)
+        select(ParseImportError, visible_files_cte.c.dag_id)
+        .join(
+            visible_files_cte,
+            ParseImportError.filename == visible_files_cte.c.relative_fileloc,
+            ParseImportError.bundle_name == visible_files_cte.c.bundle_name,
+        )
         .order_by(ParseImportError.id)
     )
 
diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 9cf13b52693..200d2bc6145 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -208,8 +208,12 @@ def _serialize_dag_capturing_errors(
     except Exception:
         log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", 
dag.dag_id, dag.fileloc)
         dagbag_import_error_traceback_depth = conf.getint("core", 
"dagbag_import_error_traceback_depth")
-        # todo AIP-66: this needs to use bundle name / rel fileloc instead
-        return [(dag.fileloc, 
traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
+        return [
+            (
+                (bundle_name, dag.relative_fileloc),
+                
traceback.format_exc(limit=-dagbag_import_error_traceback_depth),
+            )
+        ]
 
 
 def _sync_dag_perms(dag: MaybeSerializedDAG, session: Session):
@@ -245,7 +249,10 @@ def _update_dag_warnings(
 
 
 def _update_import_errors(
-    files_parsed: set[str], bundle_name: str, import_errors: dict[str, str], 
session: Session
+    files_parsed: set[tuple[str, str]],
+    bundle_name: str,
+    import_errors: dict[tuple[str, str], str],
+    session: Session,
 ):
     from airflow.listeners.listener import get_listener_manager
 
@@ -254,51 +261,67 @@ def _update_import_errors(
 
     session.execute(
         delete(ParseImportError).where(
-            ParseImportError.filename.in_(list(files_parsed)), 
ParseImportError.bundle_name == bundle_name
+            tuple_(ParseImportError.bundle_name, 
ParseImportError.filename).in_(files_parsed)
         )
     )
 
+    # the below query has to match (bundle_name, filename) tuple in that order 
since the
+    # import_errors list is a dict with keys as (bundle_name, relative_fileloc)
     existing_import_error_files = set(
-        session.execute(select(ParseImportError.filename, 
ParseImportError.bundle_name))
+        session.execute(select(ParseImportError.bundle_name, 
ParseImportError.filename))
     )
-
     # Add the errors of the processed files
-    for filename, stacktrace in import_errors.items():
-        if (filename, bundle_name) in existing_import_error_files:
-            session.query(ParseImportError).where(
-                ParseImportError.filename == filename, 
ParseImportError.bundle_name == bundle_name
-            ).update(
-                {
-                    "filename": filename,
-                    "bundle_name": bundle_name,
-                    "timestamp": utcnow(),
-                    "stacktrace": stacktrace,
-                },
+    for key, stacktrace in import_errors.items():
+        bundle_name_, relative_fileloc = key
+
+        if key in existing_import_error_files:
+            session.execute(
+                update(ParseImportError)
+                .where(
+                    ParseImportError.filename == relative_fileloc,
+                    ParseImportError.bundle_name == bundle_name_,
+                )
+                .values(
+                    filename=relative_fileloc,
+                    bundle_name=bundle_name_,
+                    timestamp=utcnow(),
+                    stacktrace=stacktrace,
+                ),
             )
             # sending notification when an existing dag import error occurs
             try:
+                # todo: make listener accept bundle_name and relative_filename
+                import_error = session.scalar(
+                    select(ParseImportError).where(
+                        ParseImportError.bundle_name == bundle_name_,
+                        ParseImportError.filename == relative_fileloc,
+                    )
+                )
                 get_listener_manager().hook.on_existing_dag_import_error(
-                    filename=filename, stacktrace=stacktrace
+                    filename=import_error.full_file_path(), 
stacktrace=stacktrace
                 )
             except Exception:
                 log.exception("error calling listener")
         else:
-            session.add(
-                ParseImportError(
-                    filename=filename,
-                    bundle_name=bundle_name,
-                    timestamp=utcnow(),
-                    stacktrace=stacktrace,
-                )
+            import_error = ParseImportError(
+                filename=relative_fileloc,
+                bundle_name=bundle_name,
+                timestamp=utcnow(),
+                stacktrace=stacktrace,
             )
+            session.add(import_error)
             # sending notification when a new dag import error occurs
             try:
-                
get_listener_manager().hook.on_new_dag_import_error(filename=filename, 
stacktrace=stacktrace)
+                get_listener_manager().hook.on_new_dag_import_error(
+                    filename=import_error.full_file_path(), 
stacktrace=stacktrace
+                )
             except Exception:
                 log.exception("error calling listener")
         session.execute(
             update(DagModel)
-            .where(DagModel.fileloc == filename)
+            .where(
+                DagModel.relative_fileloc == relative_fileloc,
+            )
             .values(
                 has_import_errors=True,
                 bundle_name=bundle_name,
@@ -312,7 +335,7 @@ def update_dag_parsing_results_in_db(
     bundle_name: str,
     bundle_version: str | None,
     dags: Collection[MaybeSerializedDAG],
-    import_errors: dict[str, str],
+    import_errors: dict[tuple[str, str], str],
     warnings: set[DagWarning],
     session: Session,
     *,
@@ -362,13 +385,16 @@ def update_dag_parsing_results_in_db(
             # Only now we are "complete" do we update import_errors - don't 
want to record errors from
             # previous failed attempts
             import_errors.update(dict(serialize_errors))
-
     # Record import errors into the ORM - we don't retry on this one as it's 
not as critical that it works
     try:
         # TODO: This won't clear errors for files that exist that no longer 
contain DAGs. Do we need to pass
         # in the list of file parsed?
 
-        good_dag_filelocs = {dag.fileloc for dag in dags if dag.fileloc not in 
import_errors}
+        good_dag_filelocs = {
+            (bundle_name, dag.relative_fileloc)
+            for dag in dags
+            if dag.relative_fileloc is not None and (bundle_name, 
dag.relative_fileloc) not in import_errors
+        }
         _update_import_errors(
             files_parsed=good_dag_filelocs,
             bundle_name=bundle_name,
diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 8b0cab74b6e..272e086d909 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin):
             self.deactivate_deleted_dags(bundle_name=bundle.name, 
present=found_files)
             self.clear_orphaned_import_errors(
                 bundle_name=bundle.name,
-                observed_filelocs={str(x.absolute_path) for x in found_files}, 
 # todo: make relative
+                observed_filelocs={str(x.rel_path) for x in found_files},  # 
todo: make relative
             )
 
     def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
@@ -1141,11 +1141,16 @@ def process_parse_results(
         stat.import_errors = 1
     else:
         # record DAGs and import errors to database
+        import_errors = {}
+        if parsing_result.import_errors:
+            import_errors = {
+                (bundle_name, rel_path): error for rel_path, error in 
parsing_result.import_errors.items()
+            }
         update_dag_parsing_results_in_db(
             bundle_name=bundle_name,
             bundle_version=bundle_version,
             dags=parsing_result.serialized_dags,
-            import_errors=parsing_result.import_errors or {},
+            import_errors=import_errors,
             warnings=set(parsing_result.warnings or []),
             session=session,
         )
diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/models/dagbag.py
index b8ffdd24b50..bd7f13fb78e 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -130,7 +130,7 @@ class DagBag(LoggingMixin):
         bundle_path: Path | None = None,
     ):
         super().__init__()
-        self.bundle_path: Path | None = bundle_path
+        self.bundle_path = bundle_path
         include_examples = (
             include_examples
             if isinstance(include_examples, bool)
@@ -145,6 +145,7 @@ class DagBag(LoggingMixin):
         self.dags: dict[str, DAG] = {}
         # the file's last modified timestamp when we last read it
         self.file_last_changed: dict[str, datetime] = {}
+        # Store import errors with relative file paths as keys (relative to 
bundle_path)
         self.import_errors: dict[str, str] = {}
         self.captured_warnings: dict[str, tuple[str, ...]] = {}
         self.has_logged = False
@@ -356,6 +357,17 @@ class DagBag(LoggingMixin):
                 )
         return warnings
 
+    def _get_relative_fileloc(self, filepath: str) -> str:
+        """
+        Get the relative file location for a given filepath.
+
+        :param filepath: Absolute path to the file
+        :return: Relative path from bundle_path, or original filepath if no 
bundle_path
+        """
+        if self.bundle_path:
+            return str(Path(filepath).relative_to(self.bundle_path))
+        return filepath
+
     def _load_modules_from_file(self, filepath, safe_mode):
         from airflow.sdk.definitions._internal.contextmanager import DagContext
 
@@ -363,7 +375,8 @@ class DagBag(LoggingMixin):
             """Handle SIGSEGV signal and let the user know that the import 
failed."""
             msg = f"Received SIGSEGV signal while processing {filepath}."
             self.log.error(msg)
-            self.import_errors[filepath] = msg
+            relative_filepath = self._get_relative_fileloc(filepath)
+            self.import_errors[relative_filepath] = msg
 
         try:
             signal.signal(signal.SIGSEGV, handler)
@@ -402,12 +415,13 @@ class DagBag(LoggingMixin):
                 # This would also catch `exit()` in a dag file
                 DagContext.autoregistered_dags.clear()
                 self.log.exception("Failed to import: %s", filepath)
+                relative_filepath = self._get_relative_fileloc(filepath)
                 if self.dagbag_import_error_tracebacks:
-                    self.import_errors[filepath] = traceback.format_exc(
+                    self.import_errors[relative_filepath] = 
traceback.format_exc(
                         limit=-self.dagbag_import_error_traceback_depth
                     )
                 else:
-                    self.import_errors[filepath] = str(e)
+                    self.import_errors[relative_filepath] = str(e)
                 return []
 
         dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
@@ -467,12 +481,13 @@ class DagBag(LoggingMixin):
                     DagContext.autoregistered_dags.clear()
                     fileloc = os.path.join(filepath, zip_info.filename)
                     self.log.exception("Failed to import: %s", fileloc)
+                    relative_fileloc = self._get_relative_fileloc(fileloc)
                     if self.dagbag_import_error_tracebacks:
-                        self.import_errors[fileloc] = traceback.format_exc(
+                        self.import_errors[relative_fileloc] = 
traceback.format_exc(
                             limit=-self.dagbag_import_error_traceback_depth
                         )
                     else:
-                        self.import_errors[fileloc] = str(e)
+                        self.import_errors[relative_fileloc] = str(e)
                 finally:
                     if sys.path[0] == filepath:
                         del sys.path[0]
@@ -494,10 +509,8 @@ class DagBag(LoggingMixin):
 
         for dag, mod in top_level_dags:
             dag.fileloc = mod.__file__
-            if self.bundle_path:
-                dag.relative_fileloc = 
str(Path(mod.__file__).relative_to(self.bundle_path))
-            else:
-                dag.relative_fileloc = dag.fileloc
+            relative_fileloc = self._get_relative_fileloc(dag.fileloc)
+            dag.relative_fileloc = relative_fileloc
             try:
                 dag.validate()
                 self.bag_dag(dag=dag)
@@ -505,7 +518,7 @@ class DagBag(LoggingMixin):
                 pass
             except Exception as e:
                 self.log.exception("Failed to bag_dag: %s", dag.fileloc)
-                self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
+                self.import_errors[relative_fileloc] = f"{type(e).__name__}: 
{e}"
                 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
             else:
                 found_dags.append(dag)
@@ -665,12 +678,13 @@ class DagBag(LoggingMixin):
             else LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
             for dag in self.dags.values()
         ]
+        import_errors = {(bundle_name, rel_path): error for rel_path, error in 
self.import_errors.items()}
 
         update_dag_parsing_results_in_db(
             bundle_name,
             bundle_version,
             dags,
-            self.import_errors,
+            import_errors,
             self.dag_warnings,
             session=session,
         )
diff --git a/airflow-core/src/airflow/models/errors.py 
b/airflow-core/src/airflow/models/errors.py
index 748d56c46b4..6670df1dfaf 100644
--- a/airflow-core/src/airflow/models/errors.py
+++ b/airflow-core/src/airflow/models/errors.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 from sqlalchemy import Column, Integer, String, Text
 
+from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.models.base import Base, StringID
 from airflow.utils.sqlalchemy import UtcDateTime
 
@@ -29,6 +30,11 @@ class ParseImportError(Base):
     __tablename__ = "import_error"
     id = Column(Integer, primary_key=True)
     timestamp = Column(UtcDateTime)
-    filename = Column(String(1024))  # todo AIP-66: make this bundle and 
relative fileloc
+    filename = Column(String(1024))
     bundle_name = Column(StringID())
     stacktrace = Column(Text)
+
+    def full_file_path(self) -> str:
+        """Return the full file path of the dag."""
+        bundle = DagBundlesManager().get_bundle(self.bundle_name)
+        return "/".join([str(bundle.path), self.filename])
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
index 4269feeaac3..7654b0e62b9 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
@@ -51,7 +51,7 @@ BUNDLE_NAME = "dag_maker"
 @pytest.fixture(scope="class")
 @provide_session
 def permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
-    dag_model = DagModel(fileloc=FILENAME1, dag_id="dag_id1", is_paused=False)
+    dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, 
dag_id="dag_id1", is_paused=False)
     session.add(dag_model)
     session.commit()
     return dag_model
@@ -60,7 +60,7 @@ def permitted_dag_model(session: Session = NEW_SESSION) -> 
DagModel:
 @pytest.fixture(scope="class")
 @provide_session
 def not_permitted_dag_model(session: Session = NEW_SESSION) -> DagModel:
-    dag_model = DagModel(fileloc=FILENAME1, dag_id="dag_id4", is_paused=False)
+    dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, 
dag_id="dag_id4", is_paused=False)
     session.add(dag_model)
     session.commit()
     return dag_model
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index bb10742e6e6..dfaa0b5ec26 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -453,28 +453,29 @@ class TestUpdateDagParsingResults:
         new_serialized_dags_count = 
session.query(func.count(SerializedDagModel.dag_id)).scalar()
         assert new_serialized_dags_count == 1
 
+    @patch.object(ParseImportError, "full_file_path")
     @patch.object(SerializedDagModel, "write_dag")
     def test_serialized_dag_errors_are_import_errors(
-        self, mock_serialize, caplog, session, dag_import_error_listener, 
testing_dag_bundle
+        self, mock_serialize, mock_full_path, caplog, session, 
dag_import_error_listener, testing_dag_bundle
     ):
         """
         Test that errors serializing a DAG are recorded as import_errors in 
the DB
         """
         mock_serialize.side_effect = SerializationError
-
         caplog.set_level(logging.ERROR)
 
         dag = DAG(dag_id="test")
         dag.fileloc = "abc.py"
+        dag.relative_fileloc = "abc.py"
+        mock_full_path.return_value = "abc.py"
 
         import_errors = {}
         update_dag_parsing_results_in_db("testing", None, [dag], 
import_errors, set(), session)
         assert "SerializationError" in caplog.text
 
         # Should have been edited in place
-        err = import_errors.get(dag.fileloc)
+        err = import_errors.get(("testing", dag.relative_fileloc))
         assert "SerializationError" in err
-
         dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
         assert dag_model.has_import_errors is True
 
@@ -482,7 +483,7 @@ class TestUpdateDagParsingResults:
 
         assert len(import_errors) == 1
         import_error = import_errors[0]
-        assert import_error.filename == dag.fileloc
+        assert import_error.filename == dag.relative_fileloc
         assert "SerializationError" in import_error.stacktrace
 
         # Ensure the listener was notified
@@ -490,13 +491,17 @@ class TestUpdateDagParsingResults:
         assert len(dag_import_error_listener.existing) == 0
         assert dag_import_error_listener.new["abc.py"] == 
import_error.stacktrace
 
-    def test_new_import_error_replaces_old(self, session, 
dag_import_error_listener, testing_dag_bundle):
+    @patch.object(ParseImportError, "full_file_path")
+    def test_new_import_error_replaces_old(
+        self, mock_full_file_path, session, dag_import_error_listener, 
testing_dag_bundle
+    ):
         """
         Test that existing import error is updated and new record not created
         for a dag with the same filename
         """
         bundle_name = "testing"
         filename = "abc.py"
+        mock_full_file_path.return_value = filename
         prev_error = ParseImportError(
             filename=filename,
             bundle_name=bundle_name,
@@ -511,7 +516,7 @@ class TestUpdateDagParsingResults:
             bundle_name=bundle_name,
             bundle_version=None,
             dags=[],
-            import_errors={"abc.py": "New error"},
+            import_errors={("testing", "abc.py"): "New error"},
             warnings=set(),
             session=session,
         )
@@ -560,6 +565,7 @@ class TestUpdateDagParsingResults:
 
         dag = DAG(dag_id="test")
         dag.fileloc = filename
+        dag.relative_fileloc = filename
 
         import_errors = {}
         update_dag_parsing_results_in_db(bundle_name, None, [dag], 
import_errors, set(), session)
@@ -593,7 +599,8 @@ class TestUpdateDagParsingResults:
         session.flush()
         dag = DAG(dag_id="test")
         dag.fileloc = filename
-        import_errors = {filename: "Some error"}
+        dag.relative_fileloc = filename
+        import_errors = {(bundle_name, filename): "Some error"}
         update_dag_parsing_results_in_db(bundle_name, None, [dag], 
import_errors, set(), session)
         dag_model = session.get(DagModel, (dag.dag_id,))
         assert dag_model.has_import_errors is True

Reply via email to