amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925425251


##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
     # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
     # retries from the task SDK now, we can handle using max_tries
     return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   I would really like that too, but it is significantly tricky because of few 
things. 
   
   In the API payload, we have the following details:
   1. The task outlets in `[name, uri]` form with the type of asset it is: 
`Asset, AssetAlias, AssetNameRef, AssetUriRef`.
   2. The events associated with the asset type: for assets we send: 
`events[obj]` and for asset aliases, we send: `events[obj].asset_alias_events` 
where events is `context["outlet_keys"]`. 
   
   This will require adding a lot of `if - else` checks around and better way 
to identify the `obj`, either isinstance or "==" here: 
https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L2759-L2776
   
   Myself and @kaxil were also discussing that the function 
`_register_asset_changes` is overly complicated and would benefit from some 
rewriting too.
   
   I can still give it a shot if you'd like me to.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to