ashb commented on code in PR #45924: URL: https://github.com/apache/airflow/pull/45924#discussion_r1926655219
########## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ########## @@ -43,6 +44,7 @@ SetRenderedFields, SetXCom, StartupDetails, + SucceedTask, Review Comment: Lets be a bit more consistent with `TaskState` here: ```suggestion TaskSuccess, ``` ########## airflow/api_fastapi/execution_api/routes/task_instances.py: ########## @@ -243,6 +244,17 @@ def ti_update_state( else: updated_state = State.FAILED query = query.values(state=updated_state) + elif isinstance(ti_patch_payload, TISuccessStatePayload): + query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) + updated_state = ti_patch_payload.state + task_instance = session.get(TI, ti_id_str) + TI._register_asset_changes_int( Review Comment: Nit/not even quite a nit: If this method is meant to be called from things outside the TI class then it shouldn't be prefixed with `_` ########## task_sdk/src/airflow/sdk/api/client.py: ########## @@ -136,6 +137,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState, when: datetime): body = TITerminalStatePayload(end_date=when, state=TerminalTIState(state)) self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json()) + def succeed(self, id: uuid.UUID, when: datetime, task_outlets, outlet_events): + """Tell the API server that this TI has to succeed.""" Review Comment: ```suggestion """Tell the API server that this TI has succeeded.""" ``` ########## task_sdk/src/airflow/sdk/execution_time/task_runner.py: ########## @@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger): _push_xcom_if_needed(result, ti) + task_outlets = [] + outlet_events = [] + events = context["outlet_events"] + + for obj in ti.task.outlets or []: + # Lineage can have other types of objects besides assets + asset_type = type(obj).__name__ + if isinstance(obj, Asset): + task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type)) + outlet_events.append(attrs.asdict(events[obj])) # type: ignore + elif isinstance(obj, AssetNameRef): + task_outlets.append(AssetProfile(name=obj.name, asset_type=asset_type)) + # Send all events, filtering can be done in API server. + outlet_events.append(attrs.asdict(events)) # type: ignore + elif isinstance(obj, AssetUriRef): + task_outlets.append(AssetProfile(uri=obj.uri, asset_type=asset_type)) + # Send all events, filtering can be done in API server. + outlet_events.append(attrs.asdict(events)) # type: ignore + elif isinstance(obj, AssetAlias): + task_outlets.append(AssetProfile(asset_type=asset_type)) + for asset_alias_event in events[obj].asset_alias_events: + outlet_events.append(attrs.asdict(asset_alias_event)) Review Comment: For clarity lets pull this all out into a func called something like `process_assets` or `process_outlets` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org