GitHub user TKul6 created a discussion: Support triggering DAGS based on conditional assets
Hi, thanks for finding the time to read this. ## 🟦 Current State We operate a centralized Airflow 3 installation that serves the entire company. Each team owns a bundle of DAGs that may act as **Producers** or **Consumers**. - Producer DAGs emit **asset events** when they complete. - Consumer DAGs should start **only after all required producer DAGs have finished**. - Each asset event includes an `extra` field containing a `customerId`. The goal is to trigger a consumer DAG **only when all producer asset events for the same customer** have been published. --- ## 🟥 The Issue I initially considered using Airflow’s **Asset Scheduling** feature and composing conditions like: ``` producer1Asset & producer2Asset ``` However, the challenge is that each asset event contains a `customerId`, and Airflow’s asset scheduling does not natively support: - Grouping or correlating asset events by a dynamic field such as `customerId` - Triggering a DAG only when *all* required assets for the *same* customer have arrived - Querying asset events by the `extra` field via the REST API A workaround could be triggering the consumer DAG on **every** asset event and then querying the REST API to check whether all required events exist. But this has two problems: 1. The REST API cannot filter by the `extra` field (e.g., `customerId`). 2. It would cause a large number of unnecessary DAG runs that immediately exit because not all prerequisites are met. --- ## 🟩 Proposed Direction / Possible Solution One idea is to introduce a shared identifier such as: - `traceId` - `correlationId` - `groupId` This ID would be included in each producer’s asset event. The consumer DAG would then trigger **only when all required asset events with the same correlationId have been published**. This raises the question: **Can Airflow’s asset scheduling or event‑based triggers support correlating asset events by a shared dynamic identifier (e.g., correlationId) and triggering a DAG only when all matching events are present?** I can say that: 1. I have a starting point that starts all the producers (and I can generate that correlation id over there). 2. I can have a very defined window where the assetsEvents can be published. I couldn't find such feature in the docs. Do you think we can do it? GitHub link: https://github.com/apache/airflow/discussions/63902 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
