amoghrajesh commented on code in PR #45924: URL: https://github.com/apache/airflow/pull/45924#discussion_r1925425251
########## airflow/api_fastapi/execution_api/routes/task_instances.py: ########## @@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, max_tries: int) -> bool: # max_tries is initialised with the retries defined at task level, we do not need to explicitly ask for # retries from the task SDK now, we can handle using max_tries return max_tries != 0 and try_number <= max_tries + + +def register_asset_changes(task_instance, task_outlets, outlet_events, asset_type, session): Review Comment: I would really like that too, but it is significantly tricky because of few things. In the API payload, we have the following details: 1. The task outlets in `[name, uri]` form with the type of asset it is: `Asset, AssetAlias, AssetNameRef, AssetUriRef`. 2. The events associated with the asset type: for assets we send: `events[obj]` and for asset aliases, we send: `events[obj].asset_alias_events` where events is `context["outlet_keys"]`. This will require adding a lot of `if - else` checks around and better way to identify the `obj`, either isinstance or "==" here: https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L2759-L2776 Myself and @kaxil were also discussing that the function `_register_asset_changes` is overly complicated and would benefit from some rewriting too. I can still give it a shot if you'd like me to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org