This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 407753c3369 Use relative path in ParseImportError.filename (#51406) 407753c3369 is described below commit 407753c3369b690717d2f80fa483ecb5bb0aead7 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu Jun 12 12:43:59 2025 +0100 Use relative path in ParseImportError.filename (#51406) * Use relative path in ParseImportError.filename This aligns with our use of relative path in DAGs * fixup! Use relative path in ParseImportError.filename * revert empty string default for relative_fileloc * Fix typo and query explanatory comment * Apply suggestions from code review Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> --------- Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> --- airflow-core/src/airflow/api/common/delete_dag.py | 2 +- .../core_api/routes/public/import_error.py | 14 +++- .../src/airflow/dag_processing/collection.py | 86 ++++++++++++++-------- airflow-core/src/airflow/dag_processing/manager.py | 9 ++- airflow-core/src/airflow/models/dagbag.py | 38 +++++++--- airflow-core/src/airflow/models/errors.py | 8 +- .../core_api/routes/public/test_import_error.py | 4 +- .../tests/unit/dag_processing/test_collection.py | 23 ++++-- 8 files changed, 124 insertions(+), 60 deletions(-) diff --git a/airflow-core/src/airflow/api/common/delete_dag.py b/airflow-core/src/airflow/api/common/delete_dag.py index 2f6076252ba..9953c9aace8 100644 --- a/airflow-core/src/airflow/api/common/delete_dag.py +++ b/airflow-core/src/airflow/api/common/delete_dag.py @@ -76,7 +76,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = session.execute( delete(ParseImportError) .where( - ParseImportError.filename == dag.fileloc, + ParseImportError.filename == dag.relative_fileloc, ParseImportError.bundle_name == dag.bundle_name, ) .execution_options(synchronize_session="fetch") diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py index 4beb0ea2cd4..feb475c91c3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py @@ -151,15 +151,21 @@ def get_import_errors( # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = auth_manager.get_authorized_dag_ids(method="GET", user=user) # Build a cte that fetches dag_ids for each file location - visiable_files_cte = ( - select(DagModel.fileloc, DagModel.dag_id).where(DagModel.dag_id.in_(readable_dag_ids)).cte() + visible_files_cte = ( + select(DagModel.relative_fileloc, DagModel.dag_id, DagModel.bundle_name) + .where(DagModel.dag_id.in_(readable_dag_ids)) + .cte() ) # Prepare the import errors query by joining with the cte. # Each returned row will be a tuple: (ParseImportError, dag_id) import_errors_stmt = ( - select(ParseImportError, visiable_files_cte.c.dag_id) - .join(visiable_files_cte, ParseImportError.filename == visiable_files_cte.c.fileloc) + select(ParseImportError, visible_files_cte.c.dag_id) + .join( + visible_files_cte, + ParseImportError.filename == visible_files_cte.c.relative_fileloc, + ParseImportError.bundle_name == visible_files_cte.c.bundle_name, + ) .order_by(ParseImportError.id) ) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 9cf13b52693..200d2bc6145 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -208,8 +208,12 @@ def _serialize_dag_capturing_errors( except Exception: log.exception("Failed to write serialized DAG dag_id=%s fileloc=%s", dag.dag_id, dag.fileloc) dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth") - # todo AIP-66: this needs to use bundle name / rel fileloc instead - return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))] + return [ + ( + (bundle_name, dag.relative_fileloc), + traceback.format_exc(limit=-dagbag_import_error_traceback_depth), + ) + ] def _sync_dag_perms(dag: MaybeSerializedDAG, session: Session): @@ -245,7 +249,10 @@ def _update_dag_warnings( def _update_import_errors( - files_parsed: set[str], bundle_name: str, import_errors: dict[str, str], session: Session + files_parsed: set[tuple[str, str]], + bundle_name: str, + import_errors: dict[tuple[str, str], str], + session: Session, ): from airflow.listeners.listener import get_listener_manager @@ -254,51 +261,67 @@ def _update_import_errors( session.execute( delete(ParseImportError).where( - ParseImportError.filename.in_(list(files_parsed)), ParseImportError.bundle_name == bundle_name + tuple_(ParseImportError.bundle_name, ParseImportError.filename).in_(files_parsed) ) ) + # the below query has to match (bundle_name, filename) tuple in that order since the + # import_errors list is a dict with keys as (bundle_name, relative_fileloc) existing_import_error_files = set( - session.execute(select(ParseImportError.filename, ParseImportError.bundle_name)) + session.execute(select(ParseImportError.bundle_name, ParseImportError.filename)) ) - # Add the errors of the processed files - for filename, stacktrace in import_errors.items(): - if (filename, bundle_name) in existing_import_error_files: - session.query(ParseImportError).where( - ParseImportError.filename == filename, ParseImportError.bundle_name == bundle_name - ).update( - { - "filename": filename, - "bundle_name": bundle_name, - "timestamp": utcnow(), - "stacktrace": stacktrace, - }, + for key, stacktrace in import_errors.items(): + bundle_name_, relative_fileloc = key + + if key in existing_import_error_files: + session.execute( + update(ParseImportError) + .where( + ParseImportError.filename == relative_fileloc, + ParseImportError.bundle_name == bundle_name_, + ) + .values( + filename=relative_fileloc, + bundle_name=bundle_name_, + timestamp=utcnow(), + stacktrace=stacktrace, + ), ) # sending notification when an existing dag import error occurs try: + # todo: make listener accept bundle_name and relative_filename + import_error = session.scalar( + select(ParseImportError).where( + ParseImportError.bundle_name == bundle_name_, + ParseImportError.filename == relative_fileloc, + ) + ) get_listener_manager().hook.on_existing_dag_import_error( - filename=filename, stacktrace=stacktrace + filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: log.exception("error calling listener") else: - session.add( - ParseImportError( - filename=filename, - bundle_name=bundle_name, - timestamp=utcnow(), - stacktrace=stacktrace, - ) + import_error = ParseImportError( + filename=relative_fileloc, + bundle_name=bundle_name, + timestamp=utcnow(), + stacktrace=stacktrace, ) + session.add(import_error) # sending notification when a new dag import error occurs try: - get_listener_manager().hook.on_new_dag_import_error(filename=filename, stacktrace=stacktrace) + get_listener_manager().hook.on_new_dag_import_error( + filename=import_error.full_file_path(), stacktrace=stacktrace + ) except Exception: log.exception("error calling listener") session.execute( update(DagModel) - .where(DagModel.fileloc == filename) + .where( + DagModel.relative_fileloc == relative_fileloc, + ) .values( has_import_errors=True, bundle_name=bundle_name, @@ -312,7 +335,7 @@ def update_dag_parsing_results_in_db( bundle_name: str, bundle_version: str | None, dags: Collection[MaybeSerializedDAG], - import_errors: dict[str, str], + import_errors: dict[tuple[str, str], str], warnings: set[DagWarning], session: Session, *, @@ -362,13 +385,16 @@ def update_dag_parsing_results_in_db( # Only now we are "complete" do we update import_errors - don't want to record errors from # previous failed attempts import_errors.update(dict(serialize_errors)) - # Record import errors into the ORM - we don't retry on this one as it's not as critical that it works try: # TODO: This won't clear errors for files that exist that no longer contain DAGs. Do we need to pass # in the list of file parsed? - good_dag_filelocs = {dag.fileloc for dag in dags if dag.fileloc not in import_errors} + good_dag_filelocs = { + (bundle_name, dag.relative_fileloc) + for dag in dags + if dag.relative_fileloc is not None and (bundle_name, dag.relative_fileloc) not in import_errors + } _update_import_errors( files_parsed=good_dag_filelocs, bundle_name=bundle_name, diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 8b0cab74b6e..272e086d909 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin): self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files) self.clear_orphaned_import_errors( bundle_name=bundle.name, - observed_filelocs={str(x.absolute_path) for x in found_files}, # todo: make relative + observed_filelocs={str(x.rel_path) for x in found_files}, # todo: make relative ) def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: @@ -1141,11 +1141,16 @@ def process_parse_results( stat.import_errors = 1 else: # record DAGs and import errors to database + import_errors = {} + if parsing_result.import_errors: + import_errors = { + (bundle_name, rel_path): error for rel_path, error in parsing_result.import_errors.items() + } update_dag_parsing_results_in_db( bundle_name=bundle_name, bundle_version=bundle_version, dags=parsing_result.serialized_dags, - import_errors=parsing_result.import_errors or {}, + import_errors=import_errors, warnings=set(parsing_result.warnings or []), session=session, ) diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index b8ffdd24b50..bd7f13fb78e 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -130,7 +130,7 @@ class DagBag(LoggingMixin): bundle_path: Path | None = None, ): super().__init__() - self.bundle_path: Path | None = bundle_path + self.bundle_path = bundle_path include_examples = ( include_examples if isinstance(include_examples, bool) @@ -145,6 +145,7 @@ class DagBag(LoggingMixin): self.dags: dict[str, DAG] = {} # the file's last modified timestamp when we last read it self.file_last_changed: dict[str, datetime] = {} + # Store import errors with relative file paths as keys (relative to bundle_path) self.import_errors: dict[str, str] = {} self.captured_warnings: dict[str, tuple[str, ...]] = {} self.has_logged = False @@ -356,6 +357,17 @@ class DagBag(LoggingMixin): ) return warnings + def _get_relative_fileloc(self, filepath: str) -> str: + """ + Get the relative file location for a given filepath. + + :param filepath: Absolute path to the file + :return: Relative path from bundle_path, or original filepath if no bundle_path + """ + if self.bundle_path: + return str(Path(filepath).relative_to(self.bundle_path)) + return filepath + def _load_modules_from_file(self, filepath, safe_mode): from airflow.sdk.definitions._internal.contextmanager import DagContext @@ -363,7 +375,8 @@ class DagBag(LoggingMixin): """Handle SIGSEGV signal and let the user know that the import failed.""" msg = f"Received SIGSEGV signal while processing {filepath}." self.log.error(msg) - self.import_errors[filepath] = msg + relative_filepath = self._get_relative_fileloc(filepath) + self.import_errors[relative_filepath] = msg try: signal.signal(signal.SIGSEGV, handler) @@ -402,12 +415,13 @@ class DagBag(LoggingMixin): # This would also catch `exit()` in a dag file DagContext.autoregistered_dags.clear() self.log.exception("Failed to import: %s", filepath) + relative_filepath = self._get_relative_fileloc(filepath) if self.dagbag_import_error_tracebacks: - self.import_errors[filepath] = traceback.format_exc( + self.import_errors[relative_filepath] = traceback.format_exc( limit=-self.dagbag_import_error_traceback_depth ) else: - self.import_errors[filepath] = str(e) + self.import_errors[relative_filepath] = str(e) return [] dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath) @@ -467,12 +481,13 @@ class DagBag(LoggingMixin): DagContext.autoregistered_dags.clear() fileloc = os.path.join(filepath, zip_info.filename) self.log.exception("Failed to import: %s", fileloc) + relative_fileloc = self._get_relative_fileloc(fileloc) if self.dagbag_import_error_tracebacks: - self.import_errors[fileloc] = traceback.format_exc( + self.import_errors[relative_fileloc] = traceback.format_exc( limit=-self.dagbag_import_error_traceback_depth ) else: - self.import_errors[fileloc] = str(e) + self.import_errors[relative_fileloc] = str(e) finally: if sys.path[0] == filepath: del sys.path[0] @@ -494,10 +509,8 @@ class DagBag(LoggingMixin): for dag, mod in top_level_dags: dag.fileloc = mod.__file__ - if self.bundle_path: - dag.relative_fileloc = str(Path(mod.__file__).relative_to(self.bundle_path)) - else: - dag.relative_fileloc = dag.fileloc + relative_fileloc = self._get_relative_fileloc(dag.fileloc) + dag.relative_fileloc = relative_fileloc try: dag.validate() self.bag_dag(dag=dag) @@ -505,7 +518,7 @@ class DagBag(LoggingMixin): pass except Exception as e: self.log.exception("Failed to bag_dag: %s", dag.fileloc) - self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}" + self.import_errors[relative_fileloc] = f"{type(e).__name__}: {e}" self.file_last_changed[dag.fileloc] = file_last_changed_on_disk else: found_dags.append(dag) @@ -665,12 +678,13 @@ class DagBag(LoggingMixin): else LazyDeserializedDAG(data=SerializedDAG.to_dict(dag)) for dag in self.dags.values() ] + import_errors = {(bundle_name, rel_path): error for rel_path, error in self.import_errors.items()} update_dag_parsing_results_in_db( bundle_name, bundle_version, dags, - self.import_errors, + import_errors, self.dag_warnings, session=session, ) diff --git a/airflow-core/src/airflow/models/errors.py b/airflow-core/src/airflow/models/errors.py index 748d56c46b4..6670df1dfaf 100644 --- a/airflow-core/src/airflow/models/errors.py +++ b/airflow-core/src/airflow/models/errors.py @@ -19,6 +19,7 @@ from __future__ import annotations from sqlalchemy import Column, Integer, String, Text +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.base import Base, StringID from airflow.utils.sqlalchemy import UtcDateTime @@ -29,6 +30,11 @@ class ParseImportError(Base): __tablename__ = "import_error" id = Column(Integer, primary_key=True) timestamp = Column(UtcDateTime) - filename = Column(String(1024)) # todo AIP-66: make this bundle and relative fileloc + filename = Column(String(1024)) bundle_name = Column(StringID()) stacktrace = Column(Text) + + def full_file_path(self) -> str: + """Return the full file path of the dag.""" + bundle = DagBundlesManager().get_bundle(self.bundle_name) + return "/".join([str(bundle.path), self.filename]) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py index 4269feeaac3..7654b0e62b9 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py @@ -51,7 +51,7 @@ BUNDLE_NAME = "dag_maker" @pytest.fixture(scope="class") @provide_session def permitted_dag_model(session: Session = NEW_SESSION) -> DagModel: - dag_model = DagModel(fileloc=FILENAME1, dag_id="dag_id1", is_paused=False) + dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, dag_id="dag_id1", is_paused=False) session.add(dag_model) session.commit() return dag_model @@ -60,7 +60,7 @@ def permitted_dag_model(session: Session = NEW_SESSION) -> DagModel: @pytest.fixture(scope="class") @provide_session def not_permitted_dag_model(session: Session = NEW_SESSION) -> DagModel: - dag_model = DagModel(fileloc=FILENAME1, dag_id="dag_id4", is_paused=False) + dag_model = DagModel(fileloc=FILENAME1, relative_fileloc=FILENAME1, dag_id="dag_id4", is_paused=False) session.add(dag_model) session.commit() return dag_model diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index bb10742e6e6..dfaa0b5ec26 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -453,28 +453,29 @@ class TestUpdateDagParsingResults: new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar() assert new_serialized_dags_count == 1 + @patch.object(ParseImportError, "full_file_path") @patch.object(SerializedDagModel, "write_dag") def test_serialized_dag_errors_are_import_errors( - self, mock_serialize, caplog, session, dag_import_error_listener, testing_dag_bundle + self, mock_serialize, mock_full_path, caplog, session, dag_import_error_listener, testing_dag_bundle ): """ Test that errors serializing a DAG are recorded as import_errors in the DB """ mock_serialize.side_effect = SerializationError - caplog.set_level(logging.ERROR) dag = DAG(dag_id="test") dag.fileloc = "abc.py" + dag.relative_fileloc = "abc.py" + mock_full_path.return_value = "abc.py" import_errors = {} update_dag_parsing_results_in_db("testing", None, [dag], import_errors, set(), session) assert "SerializationError" in caplog.text # Should have been edited in place - err = import_errors.get(dag.fileloc) + err = import_errors.get(("testing", dag.relative_fileloc)) assert "SerializationError" in err - dag_model: DagModel = session.get(DagModel, (dag.dag_id,)) assert dag_model.has_import_errors is True @@ -482,7 +483,7 @@ class TestUpdateDagParsingResults: assert len(import_errors) == 1 import_error = import_errors[0] - assert import_error.filename == dag.fileloc + assert import_error.filename == dag.relative_fileloc assert "SerializationError" in import_error.stacktrace # Ensure the listener was notified @@ -490,13 +491,17 @@ class TestUpdateDagParsingResults: assert len(dag_import_error_listener.existing) == 0 assert dag_import_error_listener.new["abc.py"] == import_error.stacktrace - def test_new_import_error_replaces_old(self, session, dag_import_error_listener, testing_dag_bundle): + @patch.object(ParseImportError, "full_file_path") + def test_new_import_error_replaces_old( + self, mock_full_file_path, session, dag_import_error_listener, testing_dag_bundle + ): """ Test that existing import error is updated and new record not created for a dag with the same filename """ bundle_name = "testing" filename = "abc.py" + mock_full_file_path.return_value = filename prev_error = ParseImportError( filename=filename, bundle_name=bundle_name, @@ -511,7 +516,7 @@ class TestUpdateDagParsingResults: bundle_name=bundle_name, bundle_version=None, dags=[], - import_errors={"abc.py": "New error"}, + import_errors={("testing", "abc.py"): "New error"}, warnings=set(), session=session, ) @@ -560,6 +565,7 @@ class TestUpdateDagParsingResults: dag = DAG(dag_id="test") dag.fileloc = filename + dag.relative_fileloc = filename import_errors = {} update_dag_parsing_results_in_db(bundle_name, None, [dag], import_errors, set(), session) @@ -593,7 +599,8 @@ class TestUpdateDagParsingResults: session.flush() dag = DAG(dag_id="test") dag.fileloc = filename - import_errors = {filename: "Some error"} + dag.relative_fileloc = filename + import_errors = {(bundle_name, filename): "Some error"} update_dag_parsing_results_in_db(bundle_name, None, [dag], import_errors, set(), session) dag_model = session.get(DagModel, (dag.dag_id,)) assert dag_model.has_import_errors is True