ashb commented on code in PR #45924: URL: https://github.com/apache/airflow/pull/45924#discussion_r1925221925
########## airflow/api_fastapi/execution_api/datamodels/taskinstance.py: ########## @@ -54,12 +64,52 @@ class TIEnterRunningPayload(BaseModel): class TITerminalStatePayload(BaseModel): """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or FAILED).""" - state: TerminalTIState + state: Literal[ + TerminalTIState.FAILED, + TerminalTIState.SKIPPED, + TerminalTIState.REMOVED, + TerminalTIState.FAIL_WITHOUT_RETRY, + ] end_date: UtcDateTime """When the task completed executing""" +class TISuccessStatePayload(BaseModel): + """Schema for updating TaskInstance to success state.""" + + state: Annotated[ + Literal[TerminalTIState.SUCCESS], + # Specify a default in the schema, but not in code, so Pydantic marks it as required. + WithJsonSchema( + { + "type": "string", + "enum": [TerminalTIState.SUCCESS], + "default": TerminalTIState.SUCCESS, + } + ), + ] + + end_date: UtcDateTime + """When the task completed executing""" + + task_outlets: Annotated[list[AssetNameAndUri], Field(default_factory=list)] + outlet_events: Annotated[list[Any], Field(default_factory=list)] + asset_type: str | None = None + + @root_validator(pre=True) + def parse_json_fields(cls, values): + import json + + if "task_outlets" in values and isinstance(values["task_outlets"], str): + values["task_outlets"] = json.loads(values["task_outlets"]) + + if "outlet_events" in values and isinstance(values["outlet_events"], str): + values["outlet_events"] = json.loads(values["outlet_events"]) Review Comment: This seems _highly_ suspect. Why do we have to do any json parsing ourselves? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org