This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 13b25673ac4 [v3-0-test] Fix Certain DAG import errors ("role does not
exist") don't persist in Airflow (#51511) (#54432)
13b25673ac4 is described below
commit 13b25673ac46be712ac537c9106cd9849bc24e8d
Author: Kevin Yang <[email protected]>
AuthorDate: Wed Aug 13 08:13:41 2025 -0400
[v3-0-test] Fix Certain DAG import errors ("role does not exist") don't
persist in Airflow (#51511) (#54432)
### Motivation
As described in #49651, when the access control for a DAG is set to an
non-exist role, the DAG import error show up in Airflow UI for a while and then
disappear. The update is to fix this issue, and let the import error persist in
the metadata DB until the DAG is updated with a correct access control setting.
Close #49651
### What is the issue
https://github.com/apache/airflow/blob/56fbe90a8d0b56558c02d75f4ac5852e041cb058/airflow-core/src/airflow/dag_processing/collection.py#L177
When the DAG's access control is set to a non-exist role, the following
process will raise an Exception "Failed to write serialized DAG dag_id=...".
So, how this exception is triggered?
1. `dag_was_updated` will be `True` when the first time
`SerializedDagModel.write_dag` write the serialized DAG to the database.
2. when `dag_was_updated` is `True`, `_sync_dag_perms` will be triggered to
sync DAG specific permissions. At the moment, it detects that the role doesn't
exist, and raise an error, resulting in the exception.
3. This exception will be captured, and being logged as an import error
temporary in the DB, and show up in the UI.
From my understanding, this sync process will run for every
`MIN_SERIALIZED_DAG_UPDATE_INTERVAL`. So, what happen in the second run.
1. `dag_was_updated` will be `False` since the DAG code is not updated.
2. In this case, `_sync_dag_perms` will **NOT BE TRIGGERED** even though in
the access control is set incorrectly in the DAG code.
3. Therefore, no exception will be raised, and no import error will be
logged. Therefore, the import error is removed from the DB, as well as from the
UI.
### What is the fix
In the current state, `_sync_dag_perms` runs only when the DAG is updated
(i.e., `dag_was_updated` is `True`). This can be more performant because it
doesn't run for all the DAGs. However, it cannot properly handle the sync for
permissions. Therefore, the current fix is to make `_sync_dag_perms` run for
all the DAGs during the DAG sync process. I understand it might not be an ideal
fix, but I wasn't able to find a better solution due to my limited
understanding on the code. I would re [...]
---
.../src/airflow/dag_processing/collection.py | 2 +-
.../tests/unit/dag_processing/test_collection.py | 124 ++++++++++++++++++++-
2 files changed, 120 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index bfc2cb20662..e53889b945d 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -199,7 +199,7 @@ def _serialize_dag_capturing_errors(
if not dag_was_updated:
# Check and update DagCode
DagCode.update_source_code(dag.dag_id, dag.fileloc)
- elif "FabAuthManager" in conf.get("core", "auth_manager"):
+ if "FabAuthManager" in conf.get("core", "auth_manager"):
_sync_dag_perms(dag, session=session)
return []
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index bb843221a13..5e353f0dddc 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -68,6 +68,11 @@ from tests_common.test_utils.db import (
if TYPE_CHECKING:
from kgb import SpyAgency
+mark_fab_auth_manager_test = pytest.mark.skipif(
+ condition="FabAuthManager" not in conf.get("core", "auth_manager"),
+ reason="This is only for FabAuthManager. Please set the environment
variable `AIRFLOW__CORE__AUTH_MANAGER` to
`airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager` in
`files/airflow-breeze-config/environment_variables.env` before running breeze
shell. To run the test, add the flag `--keep-env-variables` to the pytest
command.",
+)
+
def test_statement_latest_runs_one_dag():
with warnings.catch_warnings():
@@ -338,10 +343,7 @@ class TestUpdateDagParsingResults:
ser_dict = SerializedDAG.to_dict(dag)
return LazyDeserializedDAG(data=ser_dict)
- @pytest.mark.skipif(
- condition="FabAuthManager" not in conf.get("core", "auth_manager"),
- reason="This is only for FabAuthManager",
- )
+ @mark_fab_auth_manager_test
@pytest.mark.usefixtures("clean_db") # sync_perms in fab has bad session
commit hygiene
def test_sync_perms_syncs_dag_specific_perms_on_update(
self, monkeypatch, spy_agency: SpyAgency, session, time_machine,
testing_dag_bundle
@@ -376,7 +378,8 @@ class TestUpdateDagParsingResults:
# DAG isn't updated
_sync_to_db()
- spy_agency.assert_spy_not_called(sync_perms_spy)
+ # `_sync_dag_perms` should be called even the DAG isn't updated.
Otherwise, any import error will not show up until DAG is updated.
+ spy_agency.assert_spy_called_with(sync_perms_spy, dag, session=session)
# DAG is updated
dag.tags = {"new_tag"}
@@ -488,6 +491,117 @@ class TestUpdateDagParsingResults:
assert len(dag_import_error_listener.existing) == 0
assert dag_import_error_listener.new["abc.py"] ==
import_error.stacktrace
+ @patch.object(ParseImportError, "full_file_path")
+ @mark_fab_auth_manager_test
+ @pytest.mark.usefixtures("clean_db")
+ def test_import_error_persist_for_invalid_access_control_role(
+ self,
+ mock_full_path,
+ monkeypatch,
+ session,
+ time_machine,
+ dag_import_error_listener,
+ testing_dag_bundle,
+ ):
+ """
+ Test that import errors related to invalid access control role are
tracked in the DB until being fixed.
+ """
+ from airflow import settings
+
+ serialized_dags_count =
session.query(func.count(SerializedDagModel.dag_id)).scalar()
+ assert serialized_dags_count == 0
+
+ monkeypatch.setattr(settings, "MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
+ time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False)
+
+ # create a DAG and assign it a non-exist role.
+ dag = DAG(
+ dag_id="test_nonexist_access_control",
+ access_control={
+ "non_existing_role": {"can_edit", "can_read", "can_delete"},
+ },
+ )
+ dag.fileloc = "test_nonexist_access_control.py"
+ dag.relative_fileloc = "test_nonexist_access_control.py"
+ mock_full_path.return_value = "test_nonexist_access_control.py"
+
+ # the DAG processor should raise an import error when processing the
DAG above.
+ import_errors = {}
+ # run the DAG parsing.
+ update_dag_parsing_results_in_db("testing", None, [dag],
import_errors, set(), session)
+ # expect to get an error with "role does not exist" message.
+ err = import_errors.get(("testing", dag.relative_fileloc))
+ assert "AirflowException" in err
+ assert "role does not exist" in err
+ dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
+ # the DAG should contain an import error.
+ assert dag_model.has_import_errors is True
+
+ prev_import_errors = session.query(ParseImportError).all()
+ # the import error message should match.
+ assert len(prev_import_errors) == 1
+ prev_import_error = prev_import_errors[0]
+ assert prev_import_error.filename == dag.relative_fileloc
+ assert "AirflowException" in prev_import_error.stacktrace
+ assert "role does not exist" in prev_import_error.stacktrace
+
+ # this is a new import error.
+ assert len(dag_import_error_listener.new) == 1
+ assert len(dag_import_error_listener.existing) == 0
+ assert (
+ dag_import_error_listener.new["test_nonexist_access_control.py"]
== prev_import_error.stacktrace
+ )
+
+ # the DAG is serialized into the DB.
+ serialized_dags_count =
session.query(func.count(SerializedDagModel.dag_id)).scalar()
+ assert serialized_dags_count == 1
+
+ # run the update again. Even though the DAG is not updated, the
processor should raise import error since the access control is not fixed.
+ time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 5), tick=False)
+ update_dag_parsing_results_in_db("testing", None, [dag], dict(),
set(), session)
+
+ dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
+ # the DAG should contain an import error.
+ assert dag_model.has_import_errors is True
+
+ import_errors = session.query(ParseImportError).all()
+ # the import error should still in the DB.
+ assert len(import_errors) == 1
+ import_error = import_errors[0]
+ assert import_error.filename == dag.relative_fileloc
+ assert "AirflowException" in import_error.stacktrace
+ assert "role does not exist" in import_error.stacktrace
+
+ # the new import error should be the same as the previous one
+ assert len(import_errors) == len(prev_import_errors)
+ assert import_error.filename == prev_import_error.filename
+ assert import_error.filename == dag.relative_fileloc
+ assert import_error.stacktrace == prev_import_error.stacktrace
+
+ # there is a new error and an existing error.
+ assert len(dag_import_error_listener.new) == 1
+ assert len(dag_import_error_listener.existing) == 1
+ assert (
+ dag_import_error_listener.new["test_nonexist_access_control.py"]
== prev_import_error.stacktrace
+ )
+
+ # run the update again, but the incorrect access control configuration
is removed.
+ time_machine.move_to(tz.datetime(2020, 1, 5, 0, 0, 10), tick=False)
+ dag.access_control = None
+ update_dag_parsing_results_in_db("testing", None, [dag], dict(),
set(), session)
+
+ dag_model: DagModel = session.get(DagModel, (dag.dag_id,))
+ # the import error should be cleared.
+ assert dag_model.has_import_errors is False
+
+ import_errors = session.query(ParseImportError).all()
+ # the import error should be cleared.
+ assert len(import_errors) == 0
+
+ # no import error should be introduced.
+ assert len(dag_import_error_listener.new) == 1
+ assert len(dag_import_error_listener.existing) == 1
+
def test_new_import_error_replaces_old(self, session,
dag_import_error_listener, testing_dag_bundle):
"""
Test that existing import error is updated and new record not created