GitHub user benitezfede edited a discussion: DAG callback `context` does not have `dag_run` set in Airflow 3 but it is set in Airflow 2.
Hi, In the previous versions of airflow I was able to get the `dag_run` information from the `on_success_callback` `context` when a DAG succeeded callback was triggered. Now, in Airflow 3, I see that the `dag_run` is no longer populated in the callback `context`. In my opinion this is a bug, also it is inconsistent with the `tasks` callbacks as IT IS populated there. I read the documentation about callbacks [here](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html) and it points to [this](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html) the `context` template variables which have the `dag_run`. Also the code [here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/context.py#L42C14-L42C28) shows the `dag_run` so I cannot understand why it is not being populated. In order to reproduce the issue, I created this DAG which prints the keys of the `context` for DAG and tasks callbacks: ``` from datetime import datetime, timedelta import logging from airflow import DAG from airflow.operators.python import PythonOperator # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Debug callback functions def debug_task_success_callback(context): """Debug callback for task success - prints all context information""" print("=" * 80) print("TASK SUCCESS CALLBACK DEBUG") print("=" * 80) print(f"Context keys: {list(context.keys())}") print(f"Context type: {type(context)}") # Print each context key and its value for key, value in context.items(): print(f" {key}: {type(value)} = {value}") if hasattr(value, '__dict__'): print(f" Attributes: {[attr for attr in dir(value) if not attr.startswith('_')]}") # Specifically check for dag_run if 'dag_run' in context: print(f"\nDAG_RUN FOUND:") print(f" Type: {type(context['dag_run'])}") print(f" Attributes: {[attr for attr in dir(context['dag_run']) if not attr.startswith('_')]}") print(f" Dict: {context['dag_run'].__dict__}") else: print(f"\nDAG_RUN NOT FOUND in context!") print("=" * 80) def debug_dag_success_callback(context): """Debug callback for DAG success - prints all context information""" print("=" * 80) print("DAG SUCCESS CALLBACK DEBUG") print("=" * 80) print(f"Context keys: {list(context.keys())}") print(f"Context type: {type(context)}") # Print each context key and its value for key, value in context.items(): print(f" {key}: {type(value)} = {value}") if hasattr(value, '__dict__'): print(f" Attributes: {[attr for attr in dir(value) if not attr.startswith('_')]}") # Specifically check for dag_run if 'dag_run' in context: print(f"\nDAG_RUN FOUND:") print(f" Type: {type(context['dag_run'])}") print(f" Attributes: {[attr for attr in dir(context['dag_run']) if not attr.startswith('_')]}") print(f" Dict: {context['dag_run'].__dict__}") else: print(f"\nDAG_RUN NOT FOUND in context!") print("=" * 80) def simple_task(): """Simple task that just prints a message""" print("Hello from simple task!") return "task completed" # Define the DAG default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email': ['airf...@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(seconds=10), # Add task-level callback 'on_success_callback': debug_task_success_callback, } dag = DAG( 'debug_context_dag', default_args=default_args, description='A simple DAG to debug context in Airflow 3.0', schedule=timedelta(days=1), catchup=False, tags=['debug', 'context'], # Add DAG-level callback on_success_callback=debug_dag_success_callback, ) # Define the task task1 = PythonOperator( task_id='simple_task', python_callable=simple_task, dag=dag, ) # Set task dependencies (none needed for single task) task1 ``` The output of running this DAG shows that for the DAG callback the `dag_run` is not present: ``` DAG_RUN NOT FOUND in context! ``` The context for DAG callbacks has only these keys: ``` "Context keys: ['dag', 'run_id', 'reason']" ``` For the task callback we see the following keys: ``` Context keys: ['dag', 'inlets', 'map_index_template', 'outlets', 'run_id', 'task', 'task_instance', 'ti', 'outlet_events', 'inlet_events', 'macros', 'params', 'var', 'conn', 'dag_run', 'triggering_asset_events', 'task_instance_key_str', 'task_reschedule_count', 'prev_start_date_success', 'prev_end_date_success', 'logical_date', 'ds', 'ds_nodash', 'ts', 'ts_nodash', 'ts_nodash_with_tz', 'data_interval_end', 'data_interval_start', 'prev_data_interval_start_success', 'prev_data_interval_end_success', 'templates_dict', 'exception'] ``` And it has the `dag_run` as well: ``` DAG_RUN FOUND:: chan="stdout": source="task" [2025-07-21, 23:28:17] INFO - Type: <class 'airflow.sdk.api.datamodels._generated.DagRun'>: chan="stdout": source="task" [2025-07-21, 23:28:17] INFO - Attributes: ['clear_number', 'conf', 'construct', 'consumed_asset_events', 'copy', 'dag_id', 'data_interval_end', 'data_interval_start', 'dict', 'end_date', 'from_orm', 'json', 'logical_date', 'model_computed_fields', 'model_config', 'model_construct', 'model_copy', 'model_dump', 'model_dump_json', 'model_extra', 'model_fields', 'model_fields_set', 'model_json_schema', 'model_parametrized_name', 'model_post_init', 'model_rebuild', 'model_validate', 'model_validate_json', 'model_validate_strings', 'parse_file', 'parse_obj', 'parse_raw', 'run_after', 'run_id', 'run_type', 'schema', 'schema_json', 'start_date', 'update_forward_refs', 'validate'] ``` So there is a massive difference in the information loaded in the `context` for the task callbacks vs the DAG callback. Is this expected or is it a bug? As I said it seems to be a bug. It seems to be a matter of adding `dag_run: self,` [here](https://github.com/apache/airflow/blob/6b618efa91b4ef80c4821537b30f14a49c2badb6/airflow-core/src/airflow/models/dagrun.py#L1356). Thanks, Fede GitHub link: https://github.com/apache/airflow/discussions/53618 ---- This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org