hkc-8010 opened a new issue, #66483:
URL: https://github.com/apache/airflow/issues/66483

   ### Apache Airflow version
   
   3.1.8 and current main
   
   ### What happened?
   
   We found a case where DAG-level callbacks for versioned DAG bundles are 
fetched and queued by the dag processor, but then get treated as orphaned 
before callback execution.
   
   The observed sequence is:
   
   1. Scheduler creates a `DagCallbackRequest`
   2. Dag processor fetches it from the DB
   3. Dag processor queues it for file processing
   4. Dag processor starts or prepares callback processing for the file
   5. The same callback file is later treated as "not present" / orphaned and 
is purged or the processor is stopped before `_execute_dag_callbacks` runs
   
   This is distinct from the stale serialized DAG metadata problem in #66301 
and the fix in #66474.
   
   That earlier issue was about `SerializedDagModel` not refreshing when only 
bundle version changed. Here, the failure is later in the dag processor 
manager's callback/orphan cleanup path.
   
   ### What you think should happen instead?
   
   If a callback request is queued for `bundle_name=X`, `rel_path=Y`, and the 
same file is still present in the current bundle scan, it should not be treated 
as removed solely because the callback queue entry includes `bundle_version` 
while the scanned "present files" entry does not.
   
   In other words, present/orphan checks in the dag processor should treat file 
presence as:
   
   - `bundle_name`
   - `rel_path`
   
   and should not require exact `DagFileInfo` equality including 
`bundle_version`.
   
   ### How to reproduce
   
   A minimal pattern is:
   
   1. Use a DAG bundle implementation with `supports_versioning=True`
   2. Have a DAG with a DAG-level `on_failure_callback`
   3. Trigger a failing DAG run so a `DagCallbackRequest` is created
   4. In the dag processor:
      - `_add_callback_to_queue()` creates a `DagFileInfo` with 
`bundle_version=<version>`
      - `_refresh_dag_bundles()` builds `known_files` using 
`DagFileInfo(rel_path=..., bundle_name=..., bundle_path=...)` with no 
`bundle_version`
   5. Let `purge_removed_files_from_queue()` / `terminate_orphan_processes()` 
run
   6. Observe that the versioned callback file can be treated as absent even 
though the same `bundle_name + rel_path` is still present in the scanned files
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Other
   
   ### Anything else?
   
   The relevant code path in 
`airflow-core/src/airflow/dag_processing/manager.py` is:
   
   - `_add_callback_to_queue()` creates:
     ```python
     file_info = DagFileInfo(
         rel_path=Path(request.filepath),
         bundle_path=bundle.path,
         bundle_name=request.bundle_name,
         bundle_version=request.bundle_version,
     )
     ```
   
   - `_refresh_dag_bundles()` builds `known_files` with:
     ```python
     DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
     ```
   
   - `purge_removed_files_from_queue()` and `terminate_orphan_processes()` then 
use full `DagFileInfo` equality against the `present` set
   
   Because `DagFileInfo` includes `bundle_version` in equality/hashing, a 
versioned callback entry and an unversioned scanned entry for the same file do 
not compare equal.
   
   That means the callback queue entry or active processor can be treated as 
orphaned even though the file is still present.
   
   A small local proof of the mismatch is:
   
   ```python
   from pathlib import Path
   from airflow.dag_processing.manager import DagFileInfo
   
   versioned = DagFileInfo(
       rel_path=Path("dags/example.py"),
       bundle_name="main",
       bundle_path=Path("/tmp/bundle"),
       bundle_version="v1",
   )
   present = DagFileInfo(
       rel_path=Path("dags/example.py"),
       bundle_name="main",
       bundle_path=Path("/tmp/bundle"),
   )
   
   assert versioned != present
   ```
   
   The fix I tested locally is to keep callback identity version-aware, but 
make present/orphan checks compare only a stable file-presence key of:
   
   - `bundle_name`
   - `rel_path`
   
   That preserves distinct callback entries per version while preventing false 
orphan cleanup.
   
   ### Are you willing to submit PR?
   
   Yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to