hkc-8010 opened a new issue, #66483:
URL: https://github.com/apache/airflow/issues/66483
### Apache Airflow version
3.1.8 and current main
### What happened?
We found a case where DAG-level callbacks for versioned DAG bundles are
fetched and queued by the dag processor, but then get treated as orphaned
before callback execution.
The observed sequence is:
1. Scheduler creates a `DagCallbackRequest`
2. Dag processor fetches it from the DB
3. Dag processor queues it for file processing
4. Dag processor starts or prepares callback processing for the file
5. The same callback file is later treated as "not present" / orphaned and
is purged or the processor is stopped before `_execute_dag_callbacks` runs
This is distinct from the stale serialized DAG metadata problem in #66301
and the fix in #66474.
That earlier issue was about `SerializedDagModel` not refreshing when only
bundle version changed. Here, the failure is later in the dag processor
manager's callback/orphan cleanup path.
### What you think should happen instead?
If a callback request is queued for `bundle_name=X`, `rel_path=Y`, and the
same file is still present in the current bundle scan, it should not be treated
as removed solely because the callback queue entry includes `bundle_version`
while the scanned "present files" entry does not.
In other words, present/orphan checks in the dag processor should treat file
presence as:
- `bundle_name`
- `rel_path`
and should not require exact `DagFileInfo` equality including
`bundle_version`.
### How to reproduce
A minimal pattern is:
1. Use a DAG bundle implementation with `supports_versioning=True`
2. Have a DAG with a DAG-level `on_failure_callback`
3. Trigger a failing DAG run so a `DagCallbackRequest` is created
4. In the dag processor:
- `_add_callback_to_queue()` creates a `DagFileInfo` with
`bundle_version=<version>`
- `_refresh_dag_bundles()` builds `known_files` using
`DagFileInfo(rel_path=..., bundle_name=..., bundle_path=...)` with no
`bundle_version`
5. Let `purge_removed_files_from_queue()` / `terminate_orphan_processes()`
run
6. Observe that the versioned callback file can be treated as absent even
though the same `bundle_name + rel_path` is still present in the scanned files
### Operating System
Linux
### Deployment
Other
### Anything else?
The relevant code path in
`airflow-core/src/airflow/dag_processing/manager.py` is:
- `_add_callback_to_queue()` creates:
```python
file_info = DagFileInfo(
rel_path=Path(request.filepath),
bundle_path=bundle.path,
bundle_name=request.bundle_name,
bundle_version=request.bundle_version,
)
```
- `_refresh_dag_bundles()` builds `known_files` with:
```python
DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
```
- `purge_removed_files_from_queue()` and `terminate_orphan_processes()` then
use full `DagFileInfo` equality against the `present` set
Because `DagFileInfo` includes `bundle_version` in equality/hashing, a
versioned callback entry and an unversioned scanned entry for the same file do
not compare equal.
That means the callback queue entry or active processor can be treated as
orphaned even though the file is still present.
A small local proof of the mismatch is:
```python
from pathlib import Path
from airflow.dag_processing.manager import DagFileInfo
versioned = DagFileInfo(
rel_path=Path("dags/example.py"),
bundle_name="main",
bundle_path=Path("/tmp/bundle"),
bundle_version="v1",
)
present = DagFileInfo(
rel_path=Path("dags/example.py"),
bundle_name="main",
bundle_path=Path("/tmp/bundle"),
)
assert versioned != present
```
The fix I tested locally is to keep callback identity version-aware, but
make present/orphan checks compare only a stable file-presence key of:
- `bundle_name`
- `rel_path`
That preserves distinct callback entries per version while preventing false
orphan cleanup.
### Are you willing to submit PR?
Yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]