jedcunningham commented on code in PR #39585: URL: https://github.com/apache/airflow/pull/39585#discussion_r1621557409
########## airflow/models/taskinstance.py: ########## @@ -1580,13 +1581,39 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | TaskInstance, session: Ses @internal_api_call @provide_session -def _defer_task( +def _defer_task_from_task_deferred( ti: TaskInstance | TaskInstancePydantic, exception: TaskDeferred, session: Session = NEW_SESSION ) -> TaskInstancePydantic | TaskInstance: from airflow.models.trigger import Trigger # First, make the trigger entry trigger_row = Trigger.from_object(exception.trigger) + updated_ti = _defer_task( + ti=ti, + session=session, + trigger_row=trigger_row, + trigger_kwargs=exception.kwargs, + next_method=exception.method_name, + timeout=exception.timeout, + ) + + session.merge(updated_ti) + session.commit() + return updated_ti + + +@internal_api_call +@provide_session +def _defer_task( Review Comment: What if we made `_defer_task` like this: ``` def `_defer_task(*, exception=None, start_trigger_args=None): ``` (omitting some kwargs) Basically, let it accept either an exception or start_trigger_args, and proceed from there. That'd also should let us have 1 place to actually create the trigger record vs having multiple helper methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org