GitHub user Wolferise created a discussion: Accessing TaskInstances from DagRun
context after Airflow 3.x API changes
We are currently migrating from Airflow **2.8.1** to **3.x** and came across a
challenge with some legacy code.
In Airflow 2.x, we used the following approach inside a task to retrieve all
`TaskInstance` objects for the current `DagRun`:
```
def legacy_func(**context):
dag_run = context['dag_run']
session = context['session']
failed_lst = []
task_instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag_run.dag_id,
TaskInstance.run_id == dag_run.run_id
).all()
for task_instance in task_instances:
if task_instance.current_state(session=session) != State.SUCCESS:
failed_lst.append(task_instance.task_id)
...
```
This worked in 2.8.1, but in 3.x direct DB sessions from within tasks are no
longer supported due to the API refactor.
I expected to be able to use something like this instead:
```
def check_task_instance_status(**context):
dagrun = context['dag_run']
for ti in dagrun.get_task_instances():
...
```
However, in Airflow 3.x the `context['dag_run']` object is an instance of
`airflow.sdk.api.datamodels._generated.DagRun`, which does not provide the
`get_task_instances()` method (unlike `airflow.models.DagRun` in earlier
versions).
Question:
Is there a recommended way in Airflow 3.x to retrieve all `TaskInstance
objects` (and their states) for the current `DagRun` directly from within a
task, without writing custom API calls?
Any documentation pointers or examples would be greatly appreciated.
GitHub link: https://github.com/apache/airflow/discussions/56037
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]