sjyangkevin commented on issue #39017:
URL: https://github.com/apache/airflow/issues/39017#issuecomment-2849005521

   Hi, I want to share some updates about the progress so far and also ask some 
questions. I would appreciate if anyone can provide me some feedback or 
suggestions.
   
   Considering all the assets generated at runtime can actually be attached to 
an asset alias. The consumer task can treated the asset alias as a single 
“asset” and be triggered when a new asset is attached and updated by producer.
   
   From my perspective, the use cases can be achieved by either being able to 
access the **name** or **uri** of the asset that is attached to the asset 
alias, or through the `extra` keyword from the `triggering_asset_events` from 
the `context` in the consumer task.
   
   However, I've run some experiments, and found that these information are not 
propagated to the `inlet_events`, as shown below the log from the consumer task.
   ```
   [2025-05-03, 21:44:35] INFO - inlet_events: 
InletEventsAccessors(_inlets=[AssetAlias(name='file_asset_event', 
group='asset')], _assets={}, 
_asset_aliases={AssetAliasUniqueKey(name='file_asset_event'): 
AssetAlias(name='file_asset_event', group='asset')}): chan="stdout": 
source="task"
   ```
   I actually attached the information in the `outlet_events`, and below is the 
log from the producer task.
   ```
   [2025-05-03, 21:44:35] INFO - outlet_events:  
OutletEventAccessors(_dict={AssetAliasUniqueKey(name='file_asset_event'): 
OutletEventAccessor(key=AssetAliasUniqueKey(name='file_asset_event'), extra={}, 
asset_alias_events=[AssetAliasEvent(source_alias_name='file_asset_event', 
dest_asset_key=AssetUniqueKey(name='/raw/data/file_05032025.csv', 
uri='/raw/data/file_05032025.csv'), extra={'filename': 
'/raw/data/file_05032025.csv'})])}): chan="stdout": source="task"
   ```
   In terms of implementation, I am trying to see if the following information 
can be accessible from the consumer task when the consumer DAG is triggered 
through asset alias.
   1. `AssetUniqueKey(name='/raw/data/file_05032025.csv', 
uri='/raw/data/file_05032025.csv')` in `inlet_events`.
   2. Or `extra={'filename': '/raw/data/file_05032025.csv'}` in 
`triggering_asset_events`.
   
   Questions:
   1. I am wondering if it is the right approach to the issue?
   2. Is there any caveat I should keep in mind when populating the information 
in `inlet_events` and `triggering_asset_events`?
   3. I am looking at the `InletEventsAccessors` under 
`task-sdk/src/airflow/sdk/execution_time/context.py`. Is it the right place to 
look at and modify the information being included in `inlet_events`?
   
   Below is what I think about why this approach can resolve the issue.
   1. In the producer task, we can match wildcard/regex and make the file as an 
asset and attach it to an asset alias.
   2. The file path information can be propagated to the consumer DAG and task 
to further process.
   3. The consumer DAG is triggered by the asset alias (it can be treated as a 
single "asset").
   
   Below is my producer and consumer DAG code that generate the logs above. 
Thanks.
   
   Producer
   ```
   from airflow.sdk import Asset, AssetAlias, DAG, Param
   from airflow.providers.standard.operators.python import PythonOperator
   
   # Define the alias once
   file_asset_alias = AssetAlias("file_asset_event")
   
   with DAG(
       dag_id="asset_alias_producer",
       params={"filename": Param(type="string")}
   ):
   
       def my_function(**context):
           filename = context["params"]["filename"]
           print("filename:", filename)
   
           outlet_events = context["outlet_events"]
           outlet_events[file_asset_alias].add(
               Asset(filename),
               extra={"filename": filename}
           )
           print("outlet_events: ", outlet_events)
   
       my_task = PythonOperator(
           task_id="producer_task",
           python_callable=my_function,
           outlets=[file_asset_alias]  # use the same alias
       )
   ```
   
   Consumer
   ```
   from airflow.sdk import Asset, AssetAlias, DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   file_asset_alias = AssetAlias("file_asset_event")
   
   with DAG(
       dag_id="asset_alias_consumer",
       schedule=[file_asset_alias]
   ):
       
       def my_function(**context):
   
           print("context:", context)
   
           inlet_events = context["inlet_events"]
           print("inlet_events:", inlet_events)
   
           triggering_asset_events = context["triggering_asset_events"]
           for event in triggering_asset_events[AssetAlias("file_asset_event")]:
               print("Received asset URI:", event.asset.key.name)
               print("Extra metadata:", event.extra)
   
   
           # for asset, asset_list in triggering_asset_events.items():
           #     print("Asset Name:", asset.name)
           #     print("Asset URI:", asset.uri)
           #     print("Source DAG ID:", asset_list[0].source_dag_id)
           #     print("Source Task ID:", asset_list[0].source_task_id)
   
       my_task = PythonOperator(
           task_id="consumer_task",
           python_callable=my_function,
           inlets=[file_asset_alias]
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to