bahiaxur opened a new issue, #51367:
URL: https://github.com/apache/airflow/issues/51367
### Apache Airflow version
3.0.1
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I've been testing the `slim-3.0.1-python3.12` image locally and
experimenting with DAG-level callbacks. However, the callbacks consistently
failed, throwing a `KeyError` exception. When I printed the `context`
parameter, I was surprised to find it was an empty dictionary. This seemed very
odd, so I checked the source code and found that, according to
`airflow/airflow-core/src/airflow/dag_processing/processor.py`, the context
should not be empty:
```
def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log:
FilteringBoundLogger) -> None:
dag = dagbag.dags[request.dag_id]
callbacks = dag.on_failure_callback if request.is_failure_callback else
dag.on_success_callback
if not callbacks:
log.warning("Callback requested, but dag didn't have any",
dag_id=request.dag_id)
return
callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
# TODO:We need a proper context object!
context: Context = { # type: ignore[assignment]
"dag": dag,
"run_id": request.run_id,
"reason": request.msg,
}
for callback in callbacks:
log.info(
"Executing on_%s dag callback",
"failure" if request.is_failure_callback else "success",
dag_id=request.dag_id,
)
try:
callback(context)
except Exception:
log.exception("Callback failed", dag_id=request.dag_id)
Stats.incr("dag.callback_exceptions", tags={"dag_id":
request.dag_id})
```
Then, I checked the code inside the Docker container generated from that
image and noticed that it was:
```
def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log:
FilteringBoundLogger) -> None:
dag = dagbag.dags[request.dag_id]
callbacks = dag.on_failure_callback if request.is_failure_callback else
dag.on_success_callback
if not callbacks:
log.warning("Callback requested, but dag didn't have any",
dag_id=request.dag_id)
return
callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
# TODO:We need a proper context object!
context: Context = {} # type: ignore[assignment]
for callback in callbacks:
log.info(
"Executing on_%s dag callback",
"failure" if request.is_failure_callback else "success",
dag_id=request.dag_id,
)
try:
callback(context)
except Exception:
log.exception("Callback failed", dag_id=request.dag_id)
Stats.incr("dag.callback_exceptions", tags={"dag_id":
request.dag_id})
```
It's really odd, because checking the layers in Docker Hub shows the commit
SHA is `11376a7159fb420b3c50fce7693bae34f1c79e60`. When I check the repository
at that commit, the code clearly shows a non-empty context. I also noticed the
code downloaded via `pip install 'apache-airflow==3.0.1'` or `uv add
'apache-airflow==3.0.1'` also have the same problem.
### What you think should happen instead?
The docker image and Pypi codebase should reflect the repo at that commit.
### How to reproduce
1. Write a simple DAG that should evoke a callback on failure or success
like:
```
from datetime import datetime
import logging
from airflow.models.dag import dag
from airflow.decorators import task
def failure_callback(context):
logging.info(f"CONTEXT: {context}")
run_id = context["run_id"]
logging.info(f"RUN_ID: {run_id}")
def success_callback(context):
logging.info(f"CONTEXT: {context}")
context["run_id"]
logging.info(f"RUN_ID: {run_id}")
@task()
def foo():
raise Exception("error")
@dag(
on_failure_callback=failure_callback,
on_success_callback=success_callback,
start_date=datetime.now(),
)
def test_callback():
foo()
_ = test_callback()
```
2. Download the docker-compose.yaml provided by airflow
3. Adjust the env variables to setup the containers and run them
4. Trigger the DAG and check the logs of the dag_processor at
${AIRFLOW_PROJ_DIR:-.}/logs
### Operating System
Ubuntu 24.04.2 LTS
### Versions of Apache Airflow Providers
apache-airflow-providers-common-compat==1.6.1
apache-airflow-providers-common-io==1.5.4
apache-airflow-providers-common-sql==1.27.0
apache-airflow-providers-smtp==2.0.3
apache-airflow-providers-standard==1.1.0
### Deployment
Docker-Compose
### Deployment details
docker compose version
> Docker Compose version v2.34.0
docker version
> Client: Docker Engine - Community
> Version: 28.0.4
> API version: 1.48
> Go version: go1.23.7
> Git commit: b8034c0
> Built: Tue Mar 25 15:07:16 2025
> OS/Arch: linux/amd64
> Context: default
>
> Server: Docker Engine - Community
> Engine:
> Version: 28.0.4
> API version: 1.48 (minimum version 1.24)
> Go version: go1.23.7
> Git commit: 6430e49
> Built: Tue Mar 25 15:07:16 2025
> OS/Arch: linux/amd64
> Experimental: false
> containerd:
> Version: 1.7.27
> GitCommit: 05044ec0a9a75232cad458027ca83437aae3f4da
> runc:
> Version: 1.2.5
> GitCommit: v1.2.5-0-g59923ef
> docker-init:
> Version: 0.19.0
> GitCommit: de40ad0
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]