GitHub user Wolferise created a discussion: Accessing TaskInstances from DagRun 
context after Airflow 3.x API changes

We are currently migrating from Airflow **2.8.1** to **3.x** and came across a 
challenge with some legacy code.

In Airflow 2.x, we used the following approach inside a task to retrieve all 
`TaskInstance` objects for the current `DagRun`:

```
def legacy_func(**context):
    dag_run = context['dag_run']
    session = context['session']
    failed_lst = []
    task_instances = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag_run.dag_id,
        TaskInstance.run_id == dag_run.run_id
    ).all()

    for task_instance in task_instances:
        if task_instance.current_state(session=session) != State.SUCCESS:
            failed_lst.append(task_instance.task_id)
    ...
```


This worked in 2.8.1, but in 3.x direct DB sessions from within tasks are no 
longer supported due to the API refactor.

I expected to be able to use something like this instead:

```
def check_task_instance_status(**context):
    dagrun = context['dag_run']
    for ti in dagrun.get_task_instances():
        ...

```

However, in Airflow 3.x the `context['dag_run']` object is an instance of 
`airflow.sdk.api.datamodels._generated.DagRun`, which does not provide the 
`get_task_instances()` method (unlike `airflow.models.DagRun` in earlier 
versions).

Question:
Is there a recommended way in Airflow 3.x to retrieve all `TaskInstance 
objects` (and their states) for the current `DagRun` directly from within a 
task, without writing custom API calls?

Any documentation pointers or examples would be greatly appreciated.

GitHub link: https://github.com/apache/airflow/discussions/56037

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to