GitHub user TKul6 created a discussion: Support triggering DAGS based on 
conditional assets

Hi, thanks for finding the time to read this.

## 🟦 Current State

We operate a centralized Airflow 3 installation that serves the entire company. 
 
Each team owns a bundle of DAGs that may act as **Producers** or **Consumers**.

- Producer DAGs emit **asset events** when they complete.
- Consumer DAGs should start **only after all required producer DAGs have 
finished**.
- Each asset event includes an `extra` field containing a `customerId`.

The goal is to trigger a consumer DAG **only when all producer asset events for 
the same customer** have been published.

---

## 🟥 The Issue

I initially considered using Airflow’s **Asset Scheduling** feature and 
composing conditions like:

```
producer1Asset & producer2Asset
```

However, the challenge is that each asset event contains a `customerId`, and 
Airflow’s asset scheduling does not natively support:

- Grouping or correlating asset events by a dynamic field such as `customerId`
- Triggering a DAG only when *all* required assets for the *same* customer have 
arrived
- Querying asset events by the `extra` field via the REST API

A workaround could be triggering the consumer DAG on **every** asset event and 
then querying the REST API to check whether all required events exist.  
But this has two problems:

1. The REST API cannot filter by the `extra` field (e.g., `customerId`).  
2. It would cause a large number of unnecessary DAG runs that immediately exit 
because not all prerequisites are met.

---

## 🟩 Proposed Direction / Possible Solution

One idea is to introduce a shared identifier such as:

- `traceId`
- `correlationId`
- `groupId`

This ID would be included in each producer’s asset event.  
The consumer DAG would then trigger **only when all required asset events with 
the same correlationId have been published**.

This raises the question:

**Can Airflow’s asset scheduling or event‑based triggers support correlating 
asset events by a shared dynamic identifier (e.g., correlationId) and 
triggering a DAG only when all matching events are present?**

I can say that:
1. I have a starting point that starts all the producers (and I can generate 
that correlation id over there).
2. I can have a very defined window where the assetsEvents  can be published.

I couldn't find such feature in the docs. Do you think we can do it?

GitHub link: https://github.com/apache/airflow/discussions/63902

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to