hussein-awala commented on PR #32990:
URL: https://github.com/apache/airflow/pull/32990#issuecomment-1688933450

   I think it's ready :tada: 
   
   ## How it works:
   
   When we defer a task, we call the method `_trigger_timeout` to calculate the 
moment when we should cancel the Trigger and the reason for this timeout; in 
the base operator we have 2 reasons:
   - execution_timeout: it's the same execution_timeout used in the  normal 
task, it's calculated by subtracting the time already spent since start_date 
from the total execution date, and adding this duration to the current moment.
   - trigger_timeout: it's the timeout provided to the method `defer`, and we 
calculate the timeout moment by adding the timeout duration to the current 
moment.
   
   We choose the nearest moment as timeout and its corresponding reason, then 
we provide them to  `TaskDeferred` exception, the the TI adds them to its row 
(I had to add a new column to simplify the implementation).
   
   When timeout moment is reached, we cancel the trigger and we update the 
next_method to `__timeout__`, then we resume the TI execution.
   
   When the task_runner or the `resume_execution` method detect that 
`next_method` is `__timeout__`, they call the method `handle_trigger_timeout`. 
This method call `on_kill` and raises `AirflowTaskTimeout` if the reason is 
execution_timeout, and call the method `on_defer_timeout` if it's 
trigger_timeout and returns its value (I added AirflowException for different 
reasons in case the user adds different reasons without handling them).
   
   By default the method `on_defer_timeout` raise `AirflowDeferTimeout` but the 
user can override it to:
   - raising different exception
   - returning a value (it will be parsed as a normal value return from execute 
method and it will be pushed to Xcom)
   - or doing something then re-deferring the task, and this could be very 
helpful when we want to defer for X seconds without implementing this 
explicitly in the trigger or failing the task. 
   
   ### What about base sensor?
   
   The base sensor overrides the methods `_trigger_timeout` to compare with its 
timeout, and adds the reason `sensor_timeout` if it's the nearest moment, and 
the method `handle_trigger_timeout` to handle the new reason by raising 
`AirflowSensorTimeout`. Once `AirflowSensorTimeout` is raised, the TI state is 
passed failed regardless the number of remaining attempts.
   
   ### What's next?
   
   Once this PR is merged, we can override the method `handle_trigger_timeout` 
in the operators which provide `timeout` to defer method as a workaround for 
this bug, by just raising `AirflowTaskTimeout` or `AirflowSensorTimeout`. This 
method will be used by Airflow 2.7.1 to make handling the timeout identical in 
sync and async modes, and it will be skipped by older version, so the 
workaround will work, but the task will raise `TaskDeferralError`, and the 
`on_kill` method will not be called, but that's sufficient for backward 
compatibility.
   
   closes: #32638
   closes: #32580
   closes: #19929
   
   I hope merging this before 2.7.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to