sjyangkevin commented on PR #50182: URL: https://github.com/apache/airflow/pull/50182#issuecomment-2926666261
Want to share some small updates. I've merged the two queries into one and run all the unit tests in jobs/, create an actual DAG to test the behavior, and profile the query runtime. I observed the following behaviours for the DAG (attached at the end of the comment) 1. I use `asset_1 | asset_2 | asset_alias` to schedule a consumer DAG. I have a producer DAG emit all the events for both regular asset events, and asset events attached to the alias at runtime. This results in two consumer DAG runs, one for regular asset, and another one for asset alias (include all asset events attached). 2. Both asset and asset alias can carry the asset event to the consumer DAG.  Since the query use multiple `.any`, I am a little wondering about the performance. So, I have the producer DAG first create 100 asset events and attached to the alias. ### Two queries (the first implementation) Since it results in two DAG Runs, the first one captures 2 asset events of the regular `Asset`, the runtime is 0.0065 seconds. Then, the second one captures 100 asset events attached to the alias, the runtime is 0.0063 seconds.   ### One query (merged, current implementation) The runtime is 0.0050 seconds and 0.0060 seconds.   ### Producer DAG ```python from airflow.sdk import Asset, AssetAlias, DAG from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.operators.empty import EmptyOperator asset_alias = AssetAlias("asset_alias_massive_events") asset_1 = Asset("my_asset_1") asset_2 = Asset("my_asset_2") with DAG( dag_id="profile_asset_query_producer", ): def _generate_regular_asset_event(): print("generate 100 asset events for regular assets") def _attach_asset_event_to_alias(**context): outlet_events = context["outlet_events"] for i in range(1, 101): # I also tried 1001, generate 1000 DAGs outlet_events[asset_alias].add( Asset(f"my_asset_alias_{i}"), extra={"filename": f"my_asset_alias_{i}"} ) start = EmptyOperator(task_id="start") generate_regular_asset_event = PythonOperator( task_id="generate_regular_asset_event", python_callable=_generate_regular_asset_event, outlets=[asset_1, asset_2] ) attach_asset_event_to_alias = PythonOperator( task_id="attach_asset_event_to_alias", python_callable=_attach_asset_event_to_alias, outlets=[asset_alias] ) end = EmptyOperator(task_id="end") start >> [generate_regular_asset_event, attach_asset_event_to_alias] >> end ``` ### Consumer DAG ```python from airflow.sdk import Asset, AssetAlias, DAG from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.operators.empty import EmptyOperator asset_alias = AssetAlias("asset_alias_massive_events") asset_1 = Asset("my_asset_1") asset_2 = Asset("my_asset_2") with DAG( dag_id="profile_asset_query_consumer", schedule=(asset_1 | asset_2 | asset_alias) ): def _log_asset_events(**context): print("context:", context) log_asset_events = PythonOperator( task_id="log_asset_events", python_callable=_log_asset_events, inlets=[asset_1, asset_2, asset_alias] ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
