amoghrajesh commented on code in PR #45924: URL: https://github.com/apache/airflow/pull/45924#discussion_r1925413204
########## airflow/api_fastapi/execution_api/routes/task_instances.py: ########## @@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, max_tries: int) -> bool: # max_tries is initialised with the retries defined at task level, we do not need to explicitly ask for # retries from the task SDK now, we can handle using max_tries return max_tries != 0 and try_number <= max_tries + + +def register_asset_changes(task_instance, task_outlets, outlet_events, asset_type, session): + # One task only triggers one asset event for each asset with the same extra. + # This tuple[asset uri, extra] to sets alias names mapping is used to find whether + # there're assets with same uri but different extra that we need to emit more than one asset events. + asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = defaultdict(set) + asset_name_refs: set[str] = set() + asset_uri_refs: set[str] = set() + + for obj in task_outlets: + # Lineage can have other types of objects besides assets + if asset_type == "Asset": + asset_manager.register_asset_change( + task_instance=task_instance, Review Comment: Looks like for the time being we need those columns, would it be nice to tackle this in a follow up while doing: > Also perhaps in future work we should make update Asset tables to use TI UUID to link instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org