ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926655219


##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -43,6 +44,7 @@
     SetRenderedFields,
     SetXCom,
     StartupDetails,
+    SucceedTask,

Review Comment:
   Lets be a bit more consistent with `TaskState` here:
   ```suggestion
       TaskSuccess,
   ```



##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -243,6 +244,17 @@ def ti_update_state(
             else:
                 updated_state = State.FAILED
         query = query.values(state=updated_state)
+    elif isinstance(ti_patch_payload, TISuccessStatePayload):
+        query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+        updated_state = ti_patch_payload.state
+        task_instance = session.get(TI, ti_id_str)
+        TI._register_asset_changes_int(

Review Comment:
   Nit/not even quite a nit: If this method is meant to be called from things 
outside the TI class then it shouldn't be prefixed with `_`



##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -136,6 +137,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState, 
when: datetime):
         body = TITerminalStatePayload(end_date=when, 
state=TerminalTIState(state))
         self.client.patch(f"task-instances/{id}/state", 
content=body.model_dump_json())
 
+    def succeed(self, id: uuid.UUID, when: datetime, task_outlets, 
outlet_events):
+        """Tell the API server that this TI has to succeed."""

Review Comment:
   ```suggestion
           """Tell the API server that this TI has succeeded."""
   ```



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
         _push_xcom_if_needed(result, ti)
 
+        task_outlets = []
+        outlet_events = []
+        events = context["outlet_events"]
+
+        for obj in ti.task.outlets or []:
+            # Lineage can have other types of objects besides assets
+            asset_type = type(obj).__name__
+            if isinstance(obj, Asset):
+                task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+                outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+            elif isinstance(obj, AssetNameRef):
+                task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+                # Send all events, filtering can be done in API server.
+                outlet_events.append(attrs.asdict(events))  # type: ignore
+            elif isinstance(obj, AssetUriRef):
+                task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+                # Send all events, filtering can be done in API server.
+                outlet_events.append(attrs.asdict(events))  # type: ignore
+            elif isinstance(obj, AssetAlias):
+                task_outlets.append(AssetProfile(asset_type=asset_type))
+                for asset_alias_event in events[obj].asset_alias_events:
+                    outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   For clarity lets pull this all out into a func called something like 
`process_assets` or `process_outlets`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to