ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925224982


##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -243,6 +252,19 @@ def ti_update_state(
             else:
                 updated_state = State.FAILED
         query = query.values(state=updated_state)
+    elif isinstance(ti_patch_payload, TISuccessStatePayload):
+        query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+        updated_state = ti_patch_payload.state
+        task_instance = session.get(TI, ti_id_str)

Review Comment:
   Do we "need" to get the TI? 



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
     # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
     # retries from the task SDK now, we can handle using max_tries
     return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   This function feels out of place in the routes file, and would be better 
placed in airflow.models.assets I think.
   
   Or perhaps on the asset manager.



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
     # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
     # retries from the task SDK now, we can handle using max_tries
     return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   This also feels like it's a lot of code that should exist somewhere else. 
Was it copied? Did you write it all from scratch just for this PR?



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
     # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
     # retries from the task SDK now, we can handle using max_tries
     return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):
+    # One task only triggers one asset event for each asset with the same 
extra.
+    # This tuple[asset uri, extra] to sets alias names mapping is used to find 
whether
+    # there're assets with same uri but different extra that we need to emit 
more than one asset events.
+    asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = 
defaultdict(set)
+    asset_name_refs: set[str] = set()
+    asset_uri_refs: set[str] = set()
+
+    for obj in task_outlets:
+        # Lineage can have other types of objects besides assets
+        if asset_type == "Asset":
+            asset_manager.register_asset_change(
+                task_instance=task_instance,

Review Comment:
   We are only passing a TaskInstance object here for the following use cases:
   
   ```
           if task_instance:
               event_kwargs.update(
                   source_task_id=task_instance.task_id,
                   source_dag_id=task_instance.dag_id,
                   source_run_id=task_instance.run_id,
                   source_map_index=task_instance.map_index,
               )
   ```
   
   We don't need a full TI, a TIKey would be enough for that if it makes things 
easier -i.e. so we don't need to get the full TI from the DB. But if we need to 
select those columns anyway then it doesn't make much difference.
   
   Also perhaps in future work we should make update Asset tables to use TI 
UUID to link instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to