sjyangkevin commented on PR #50182:
URL: https://github.com/apache/airflow/pull/50182#issuecomment-2926666261

   Want to share some small updates. I've merged the two queries into one and 
run all the unit tests in jobs/, create an actual DAG to test the behavior, and 
profile the query runtime. I observed the following behaviours for the DAG 
(attached at the end of the comment)
   
   1. I use `asset_1 | asset_2 | asset_alias` to schedule a consumer DAG. I 
have a producer DAG emit all the events for both regular asset events, and 
asset events attached to the alias at runtime. This results in two consumer DAG 
runs, one for regular asset, and another one for asset alias (include all asset 
events attached).
   2. Both asset and asset alias can carry the asset event to the consumer DAG.
   
   ![Screenshot from 2025-06-01 
02-21-03](https://github.com/user-attachments/assets/50b7cc83-8975-4b03-9569-4c2186057a87)
   
   Since the query use multiple `.any`, I am a little wondering about the 
performance. So, I have the producer DAG first create 100 asset events and 
attached to the alias.
   
   ### Two queries (the first implementation)
   Since it results in two DAG Runs, the first one captures 2 asset events of 
the regular `Asset`, the runtime is 0.0065 seconds. Then, the second one 
captures 100 asset events attached to the alias, the runtime is 0.0063 seconds.
   ![Screenshot from 2025-06-01 
01-30-08](https://github.com/user-attachments/assets/7b9bb76f-a254-4b9c-bccb-702993a07d4b)
   ![Screenshot from 2025-06-01 
01-31-07](https://github.com/user-attachments/assets/a48dc688-0742-4145-8063-2785f53f28d4)
   
   ### One query (merged, current implementation)
   The runtime is 0.0050 seconds and 0.0060 seconds. 
   ![Screenshot from 2025-06-01 
01-38-43](https://github.com/user-attachments/assets/88e9661d-2afa-41df-b27c-aa6e8f31f237)
   ![Screenshot from 2025-06-01 
01-38-14](https://github.com/user-attachments/assets/d96c1b52-c1d7-4e8b-9e9d-f321b0f3a669)
   
   ### Producer DAG
   ```python
   from airflow.sdk import Asset, AssetAlias, DAG
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.providers.standard.operators.empty import EmptyOperator
   
   asset_alias = AssetAlias("asset_alias_massive_events")
   asset_1     = Asset("my_asset_1")
   asset_2     = Asset("my_asset_2")
   
   with DAG(
       dag_id="profile_asset_query_producer",
   ):
       
       def _generate_regular_asset_event():
           print("generate 100 asset events for regular assets")
   
       def _attach_asset_event_to_alias(**context):
           outlet_events = context["outlet_events"]
           for i in range(1, 101): # I also tried 1001, generate 1000 DAGs
               outlet_events[asset_alias].add(
                   Asset(f"my_asset_alias_{i}"),
                   extra={"filename": f"my_asset_alias_{i}"}
               )
   
       start = EmptyOperator(task_id="start")
   
       generate_regular_asset_event = PythonOperator(
           task_id="generate_regular_asset_event",
           python_callable=_generate_regular_asset_event,
           outlets=[asset_1, asset_2]
       )
   
       attach_asset_event_to_alias = PythonOperator(
           task_id="attach_asset_event_to_alias",
           python_callable=_attach_asset_event_to_alias,
           outlets=[asset_alias]
       )
   
       end = EmptyOperator(task_id="end")
       
       start >> [generate_regular_asset_event, attach_asset_event_to_alias] >> 
end
   ```
   
   ### Consumer DAG
   ```python
   from airflow.sdk import Asset, AssetAlias, DAG
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.providers.standard.operators.empty import EmptyOperator
   
   asset_alias = AssetAlias("asset_alias_massive_events")
   asset_1     = Asset("my_asset_1")
   asset_2     = Asset("my_asset_2")
   
   with DAG(
       dag_id="profile_asset_query_consumer",
       schedule=(asset_1 | asset_2 | asset_alias)
   ):
       
       def _log_asset_events(**context):
   
           print("context:", context)
   
       log_asset_events = PythonOperator(
           task_id="log_asset_events",
           python_callable=_log_asset_events,
           inlets=[asset_1, asset_2, asset_alias]
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to