GitHub user hnomichith added a comment to the discussion: Add the ability to 
backfill a DAG based on past Asset Events

I think @peter use case are solved by using task clearing feature, right?

For @ldacey use case, your only message makes me think your use case is solved 
as well, right? I'm not sure I get how related it was with the previously 
mentioned usecases.

About the initial need to process past asset events, I request a account to 
create an AIP. As suggested in the AIP instructions, I first wrote an email.

If you are interested, here is my draft. I was a bit lost because AIP 
instructions about major changes is implicitly excluding changes on Airflow 3 
REST APIs, by mentioning only Airflow 2 REST APIs. But I guess it's still worth 
to see if they have more insights about the potential consequences, or if it 
would not work. 

```markdown
# Modify Airflow 3 REST endpoints in order to queue past Asset events for a DAG

## Motivation

Let's consider three DAGs: `DAG_1`, `DAG_2` and `DAG_3`.

`DAG_1` is creating some data, at non regular frequency, then those data must 
be processed by `DAG_2` and `DAG_3`. 

This use case seems addressed by Airflow Assets: `DAG_1` updates an Airflow 
asset, and this update triggers `DAG_2` and `DAG_3`.

The problem: `DAG_2` or `DAG_3` can be created later than `DAG_1`, but they 
still need to process the past data updates.

To properly address this use case, we need a way to trigger `DAG_2` or `DAG_3` 
based on past Asset events. Additionally, `DAG_2` or `DAG_3` should be able to 
be triggered independently, since nothing should conceptually tie them together.

## Considerations

### What change do you propose to make?

I propose to modify Airflow 3 REST endpoints.

#### Add `POST /api/v2/dags/:dag_id/assets/queuedEvents`

This endpoint would take as input a list of existing Asset Event IDs. It would 
then queue the corresponding Asset Events for the DAG, as if it had just 
happened.

As a consequence, from what I understand from [the scheduler 
code](https://github.com/apache/airflow/blob/c8aa74a19e39be75800ebd13289bf0713b9718b4/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L1728),
 it will create DAG runs (if the DAG is reacting on this Asset event only).

#### Add a `name` filter to `GET /api/v2/assets/events`

In order to programmatically find the relevant Asset Event IDs, I would need 
the ability to filter Asset Events by name.

### Why is it needed?

Thanks to those two endpoints, I could create a script that would:
1. Get the list of past Asset Events for a given Asset, using `GET 
/api/v2/assets/events?name=...` (with the timestamps).
2. For each of those Asset Events, call `POST 
/api/v2/dags/:dag_id/assets/queuedEvents` to queue them for the DAG.

### Are there any downsides to this change?

>From my understanding, those will create more features to maintain, and add 
>complexity to the API.

This idea has been originally discussed in [Airflow 
Github](https://github.com/apache/airflow/discussions/59886), where I've been 
advised to create an AIP despite it might not qualify as a ["major change that 
needs an 
AIP"](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89066602#AirflowImprovementProposals-WhatisconsideredamajorchangethatneedsanAIP?).
 Here is the rationale:
> Possibly, you could use QueuedEvents, but this is not a question of "tweaking 
> an API" - but designing and implementing huge new feature, considering data 
> usage, consequences, storage mechanisms and persistency of the data as well 
> as performance impact of such re-valuations.

Therefore, I'm counting on this AIP to identify the potential consequences of 
this change, and to discuss the best way to implement it.

### Which users are affected by the change?

It shouldn't affect anyone negatively, since these are non-breaking changes to 
the API. It would just allow users to trigger DAG runs based on past Asset 
events, which is currently not possible.

### How are users affected by the change? (e.g. DB upgrade required?)

Not affected.

### What is the level of migration effort (manual and automated) needed for the 
users to adapt to the breaking changes? (especially in context of Airflow 3)

None.

### Other considerations?

There's currently a possible workaround for this use case. Still using a script 
and the Airflow 3 REST API, users can:
1. Pause all DAGs but the one they want to trigger (e.g. we want to trigger 
`DAG_2`, we pause `DAG_1` and `DAG_3`).
2. Get the list of past Asset Events, using the `source_dag_id` filter to only 
get the events related to `DAG_1`.
3. For each of those Asset Events, call `POST /api/v2/assets/events` to create 
a synthetic Asset Event with the same content as the past one.
4. Unpause the DAGs that have been previously paused.

This will trigger `DAG_2` for each of those past events, without re-triggering 
`DAG_3`.

However, I think it's a hack of the Asset Events concept, since it creates 
Asset Events that don't correspond to real data updates. Moreover, it has 
drawbacks:
- It does not work with cross-deployment DAGs. 
- Triggering it twice will create duplicate events that must be handled 
separately. 

### What defines this AIP as "done"?

The two endpoints are implemented and documented, reaching Airflow usual 
quality standards.
```

GitHub link: 
https://github.com/apache/airflow/discussions/59886#discussioncomment-15913495

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to