sjyangkevin commented on issue #39017:
URL: https://github.com/apache/airflow/issues/39017#issuecomment-2853230727
Thank you very much for the followup. Sorry for writing a very long text
trying to explain my thinking. Please let me know if there is any follow up
questions.
I want to explain it using my DAG code and logs. From what I understand the
use case, each event will be mapped to an asset respectively. Suppose I have
three file drop events pulled by the producer DAG and it generates three
different assets.
`file_A_2025-05-03` -> `Asset(name='file_A_2025-05-03')`
`file_A_2025-05-04` -> `Asset(name='file_A_2025-05-04')`
`file_A_2025-05-05` -> `Asset(name='file_A_2025-05-05')`
I think the original objective is to have the consumer DAG trigger on
updates to `Asset(name='file_A_{year}-{month}-{day}')`, but asset name doesn't
support regex kind of definition. After a few experiments, I found that we can
create `Asset`s and attached the corresponding asset event to the **Asset
Alias** defined through `outlet_events`. The consumer DAG can then be triggered
on the Asset Alias. In this way, the producer DAG can define regex/wildcard
matching (through DAG code) and create an asset event on match. Please see the
DAG code below for my experiment. Now come to the question:
> Do you mean `extra`? I think they're there. Just not included in the
`__repr__`.
### TL;DR
**The `extra` seems only available for DAG trigger on `Asset` but not
available for DAG trigger on `AssetAlias`.**
```
[2025-05-05, 23:33:37] INFO - inlet_events [inlets]:
AssetAlias(name='file_asset_event', group='asset'): chan="stdout": source="task"
[2025-05-05, 23:33:37] INFO - inlet_events [asset]: {}: chan="stdout":
source="task"
[2025-05-05, 23:33:37] INFO - inlet_events [asset alias]:
{AssetAliasUniqueKey(name='file_asset_event'):
AssetAlias(name='file_asset_event', group='asset')}: chan="stdout":
source="task"
[2025-05-05, 23:33:37] INFO - triggering_asset_events:
TriggeringAssetEventAccessor(_events=defaultdict(<class 'list'>, {})):
chan="stdout": source="task"
[2025-05-05, 23:33:37] INFO - See Asset Alias `file_asset_event`: []:
chan="stdout": source="task"
```
In the consumer task, I tried to access the `inlet_events` and the
`triggering_asset_events`, and wasn't able to see either the `Asset` or the
`AssetEevent` attached to the `AssetAlias`, as shown above. These information
seems only available when the DAG is triggered on `Asset`, as shown in the
screenshot below.

### Why the asset event or asset information matter in this case?
It can be thought as a work around to implement the requirements in a
simplified way. When the consumer task can access the asset event, suppose a
file drop create `Asset(name='file_A_2025-05-04')` and attach to the Asset
Alias. Downstream task trigger on the alias can fetch the asset's `name`,
`uri`, or even `extra` from the `triggering_asset_events` from context. In this
way, for each consumer DAG Run triggered by asset alias, the consumer task can
access the corresponding asset event and process on the correct file drop. This
might also answer:
https://github.com/apache/airflow/pull/50182#pullrequestreview-2816746403.
**Producer DAG**
```python
from airflow.sdk import Asset, AssetAlias, DAG, Param
from airflow.providers.standard.operators.python import PythonOperator
# Define the alias once
file_asset_alias = AssetAlias("file_asset_event")
with DAG(
dag_id="asset_alias_producer",
params={"filename": Param(type="string")}
):
def my_function(**context):
# here, the filename is from Airflow Parameter, for testing purpose,
but can be a regex matching on filename.
filename = context["params"]["filename"]
print("filename:", filename)
# define the outlet event and then attach the event to alias.
outlet_events = context["outlet_events"]
outlet_events[file_asset_alias].add(
Asset(filename),
extra={"filename": filename}
)
print("outlet_events: ", outlet_events)
my_task = PythonOperator(
task_id="producer_task",
python_callable=my_function,
outlets=[file_asset_alias] # use the alias as outlet
)
```
**Consumer DAG**
```python
from airflow.sdk import Asset, AssetAlias, DAG
from airflow.providers.standard.operators.python import PythonOperator
file_asset_alias = AssetAlias("file_asset_event")
# DAG to trigger on the alias
with DAG(
dag_id="asset_alias_consumer",
schedule=[file_asset_alias]
):
def my_function(**context):
print("context:", context)
# try to access the inlet events from context
inlet_events = context["inlet_events"]
print("inlet_events [inlets]:", inlet_events._inlets[0])
print("inlet_events [asset]:", inlet_events._assets)
print("inlet_events [asset alias]:", inlet_events._asset_aliases)
# try to access the asset events
triggering_asset_events = context["triggering_asset_events"]
print("triggering_asset_events:", triggering_asset_events)
print("See Asset Alias `file_asset_event`:",
triggering_asset_events[AssetAlias("file_asset_event")])
for event in triggering_asset_events[AssetAlias("file_asset_event")]:
print("Received asset URI:", event.asset.key.name)
print("Extra metadata:", event.extra)
my_task = PythonOperator(
task_id="consumer_task",
python_callable=my_function,
inlets=[file_asset_alias]
)
```
The consumer DAG can be triggered when there is an asset event attached to
the Asset Alias, and the following logs are generated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]