GitHub user benitezfede edited a discussion: DAG callback `context` does not 
have `dag_run` set in Airflow 3 but it is set in Airflow 2.

Hi,
In the previous versions of airflow I was able to get the `dag_run` information 
from the `on_success_callback` `context` when a DAG succeeded callback was 
triggered.

Now, in Airflow 3, I see that the `dag_run` is no longer populated in the 
callback `context`.  In my opinion this is a bug, also it is inconsistent with 
the `tasks` callbacks as IT IS populated there. 

I read the documentation about callbacks 
[here](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html)
 and it points to 
[this](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html)
 the `context` template variables which have the `dag_run`. Also the code 
[here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/context.py#L42C14-L42C28)
 shows the `dag_run` so I cannot understand why it is not being populated.

In order to reproduce the issue, I created this DAG which prints the keys of 
the `context` for DAG and tasks callbacks: 

```
from datetime import datetime, timedelta
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Debug callback functions
def debug_task_success_callback(context):
    """Debug callback for task success - prints all context information"""
    print("=" * 80)
    print("TASK SUCCESS CALLBACK DEBUG")
    print("=" * 80)
    print(f"Context keys: {list(context.keys())}")
    print(f"Context type: {type(context)}")
    
    # Print each context key and its value
    for key, value in context.items():
        print(f"  {key}: {type(value)} = {value}")
        if hasattr(value, '__dict__'):
            print(f"    Attributes: {[attr for attr in dir(value) if not 
attr.startswith('_')]}")
    
    # Specifically check for dag_run
    if 'dag_run' in context:
        print(f"\nDAG_RUN FOUND:")
        print(f"  Type: {type(context['dag_run'])}")
        print(f"  Attributes: {[attr for attr in dir(context['dag_run']) if not 
attr.startswith('_')]}")
        print(f"  Dict: {context['dag_run'].__dict__}")
    else:
        print(f"\nDAG_RUN NOT FOUND in context!")
    
    print("=" * 80)

def debug_dag_success_callback(context):
    """Debug callback for DAG success - prints all context information"""
    print("=" * 80)
    print("DAG SUCCESS CALLBACK DEBUG")
    print("=" * 80)
    print(f"Context keys: {list(context.keys())}")
    print(f"Context type: {type(context)}")
    
    # Print each context key and its value
    for key, value in context.items():
        print(f"  {key}: {type(value)} = {value}")
        if hasattr(value, '__dict__'):
            print(f"    Attributes: {[attr for attr in dir(value) if not 
attr.startswith('_')]}")
    
    # Specifically check for dag_run
    if 'dag_run' in context:
        print(f"\nDAG_RUN FOUND:")
        print(f"  Type: {type(context['dag_run'])}")
        print(f"  Attributes: {[attr for attr in dir(context['dag_run']) if not 
attr.startswith('_')]}")
        print(f"  Dict: {context['dag_run'].__dict__}")
    else:
        print(f"\nDAG_RUN NOT FOUND in context!")
    
    print("=" * 80)

def simple_task():
    """Simple task that just prints a message"""
    print("Hello from simple task!")
    return "task completed"

# Define the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email': ['airf...@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=10),
    # Add task-level callback
    'on_success_callback': debug_task_success_callback,
}

dag = DAG(
    'debug_context_dag',
    default_args=default_args,
    description='A simple DAG to debug context in Airflow 3.0',
    schedule=timedelta(days=1),
    catchup=False,
    tags=['debug', 'context'],
    # Add DAG-level callback
    on_success_callback=debug_dag_success_callback,
)

# Define the task
task1 = PythonOperator(
    task_id='simple_task',
    python_callable=simple_task,
    dag=dag,
)

# Set task dependencies (none needed for single task)
task1 
```

The output of running this DAG shows that for the DAG callback the `dag_run` is 
not present:

```
DAG_RUN NOT FOUND in context!
```

The context for DAG callbacks has only these keys:

```
"Context keys: ['dag', 'run_id', 'reason']"
```

For the task callback we see the following keys:

```
Context keys: ['dag', 'inlets', 'map_index_template', 'outlets', 'run_id', 
'task', 'task_instance', 'ti', 'outlet_events', 'inlet_events', 'macros', 
'params', 'var', 'conn', 'dag_run', 'triggering_asset_events', 
'task_instance_key_str', 'task_reschedule_count', 'prev_start_date_success', 
'prev_end_date_success', 'logical_date', 'ds', 'ds_nodash', 'ts', 'ts_nodash', 
'ts_nodash_with_tz', 'data_interval_end', 'data_interval_start', 
'prev_data_interval_start_success', 'prev_data_interval_end_success', 
'templates_dict', 'exception']
```

And it has the `dag_run` as well:

```
DAG_RUN FOUND:: chan="stdout": source="task"
[2025-07-21, 23:28:17] INFO -   Type: <class 
'airflow.sdk.api.datamodels._generated.DagRun'>: chan="stdout": source="task"
[2025-07-21, 23:28:17] INFO -   Attributes: ['clear_number', 'conf', 
'construct', 'consumed_asset_events', 'copy', 'dag_id', 'data_interval_end', 
'data_interval_start', 'dict', 'end_date', 'from_orm', 'json', 'logical_date', 
'model_computed_fields', 'model_config', 'model_construct', 'model_copy', 
'model_dump', 'model_dump_json', 'model_extra', 'model_fields', 
'model_fields_set', 'model_json_schema', 'model_parametrized_name', 
'model_post_init', 'model_rebuild', 'model_validate', 'model_validate_json', 
'model_validate_strings', 'parse_file', 'parse_obj', 'parse_raw', 'run_after', 
'run_id', 'run_type', 'schema', 'schema_json', 'start_date', 
'update_forward_refs', 'validate']
```

So there is a massive difference in the information loaded in the `context` for 
the task callbacks vs the DAG callback. Is this expected or is it a bug? As I 
said it seems to be a bug.

It seems to be a matter of adding `dag_run: self,` 
[here](https://github.com/apache/airflow/blob/6b618efa91b4ef80c4821537b30f14a49c2badb6/airflow-core/src/airflow/models/dagrun.py#L1356).

Thanks,
Fede

GitHub link: https://github.com/apache/airflow/discussions/53618

----
This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org

Reply via email to