bahiaxur opened a new issue, #51367:
URL: https://github.com/apache/airflow/issues/51367

   ### Apache Airflow version
   
   3.0.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I've been testing the `slim-3.0.1-python3.12` image locally and 
experimenting with DAG-level callbacks. However, the callbacks consistently 
failed, throwing a `KeyError` exception. When I printed the `context` 
parameter, I was surprised to find it was an empty dictionary. This seemed very 
odd, so I checked the source code and found that, according to 
`airflow/airflow-core/src/airflow/dag_processing/processor.py`, the context 
should not be empty:
   
   ```
   def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: 
FilteringBoundLogger) -> None:
       dag = dagbag.dags[request.dag_id]
   
       callbacks = dag.on_failure_callback if request.is_failure_callback else 
dag.on_success_callback
       if not callbacks:
           log.warning("Callback requested, but dag didn't have any", 
dag_id=request.dag_id)
           return
   
       callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
       # TODO:We need a proper context object!
       context: Context = {  # type: ignore[assignment]
           "dag": dag,
           "run_id": request.run_id,
           "reason": request.msg,
       }
   
       for callback in callbacks:
           log.info(
               "Executing on_%s dag callback",
               "failure" if request.is_failure_callback else "success",
               dag_id=request.dag_id,
           )
           try:
               callback(context)
           except Exception:
               log.exception("Callback failed", dag_id=request.dag_id)
               Stats.incr("dag.callback_exceptions", tags={"dag_id": 
request.dag_id})
   ```
   
   Then, I checked the code inside the Docker container generated from that 
image and noticed that it was:
   
   ```
   def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: 
FilteringBoundLogger) -> None:
       dag = dagbag.dags[request.dag_id]
   
       callbacks = dag.on_failure_callback if request.is_failure_callback else 
dag.on_success_callback
       if not callbacks:
           log.warning("Callback requested, but dag didn't have any", 
dag_id=request.dag_id)
           return
   
       callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
       # TODO:We need a proper context object!
       context: Context = {}  # type: ignore[assignment]
   
       for callback in callbacks:
           log.info(
               "Executing on_%s dag callback",
               "failure" if request.is_failure_callback else "success",
               dag_id=request.dag_id,
           )
           try:
               callback(context)
           except Exception:
               log.exception("Callback failed", dag_id=request.dag_id)
               Stats.incr("dag.callback_exceptions", tags={"dag_id": 
request.dag_id})
   ```
   It's really odd, because checking the layers in Docker Hub shows the commit 
SHA is `11376a7159fb420b3c50fce7693bae34f1c79e60`. When I check the repository 
at that commit, the code clearly shows a non-empty context. I also noticed the 
code downloaded via `pip install 'apache-airflow==3.0.1'` or `uv add 
'apache-airflow==3.0.1'` also have the same problem.
   
   ### What you think should happen instead?
   
   The docker image and Pypi codebase should reflect the repo at that commit.
   
   ### How to reproduce
   
   1. Write a simple DAG that should evoke a callback on failure or success 
like:
   ```
   from datetime import datetime
   import logging
   
   from airflow.models.dag import dag
   from airflow.decorators import task
   
   def failure_callback(context):
       logging.info(f"CONTEXT: {context}")
       run_id = context["run_id"]
       logging.info(f"RUN_ID: {run_id}")
   
   def success_callback(context):
       logging.info(f"CONTEXT: {context}")
       context["run_id"]
       logging.info(f"RUN_ID: {run_id}")
   
   @task()
   def foo():
       raise Exception("error")
   
   @dag(
       on_failure_callback=failure_callback,
       on_success_callback=success_callback,
       start_date=datetime.now(),
   )
   def test_callback():
   
       foo()
   
   _ = test_callback()
   ```
   2. Download the docker-compose.yaml provided by airflow
   3. Adjust the env variables to setup the containers and run them
   4. Trigger the DAG and check the logs of the dag_processor at 
${AIRFLOW_PROJ_DIR:-.}/logs
   
   ### Operating System
   
   Ubuntu 24.04.2 LTS
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-compat==1.6.1
   apache-airflow-providers-common-io==1.5.4
   apache-airflow-providers-common-sql==1.27.0
   apache-airflow-providers-smtp==2.0.3
   apache-airflow-providers-standard==1.1.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   docker compose version
   
   > Docker Compose version v2.34.0
   
   docker version
   
   > Client: Docker Engine - Community
   >  Version:           28.0.4
   >  API version:       1.48
   >  Go version:        go1.23.7
   >  Git commit:        b8034c0
   >  Built:             Tue Mar 25 15:07:16 2025
   >  OS/Arch:           linux/amd64
   >  Context:           default
   > 
   > Server: Docker Engine - Community
   >  Engine:
   >   Version:          28.0.4
   >   API version:      1.48 (minimum version 1.24)
   >   Go version:       go1.23.7
   >   Git commit:       6430e49
   >   Built:            Tue Mar 25 15:07:16 2025
   >   OS/Arch:          linux/amd64
   >   Experimental:     false
   >  containerd:
   >   Version:          1.7.27
   >   GitCommit:        05044ec0a9a75232cad458027ca83437aae3f4da
   >  runc:
   >   Version:          1.2.5
   >   GitCommit:        v1.2.5-0-g59923ef
   >  docker-init:
   >   Version:          0.19.0
   >   GitCommit:        de40ad0
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to