paultmathew commented on code in PR #67229:
URL: https://github.com/apache/airflow/pull/67229#discussion_r3277843786
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -908,6 +909,21 @@ def invoke_defer_method(
trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
+ # Translate ``execution_timeout`` into an absolute deadline plumbed
into
+ # the trigger.
+ execution_deadline: float | None = None
+ defer_timeout: datetime.timedelta | None = None
+ if self.execution_timeout is not None and context is not None:
+ ti = context.get("ti")
+ ti_start_date = getattr(ti, "start_date", None)
+ if ti_start_date is not None:
+ execution_deadline = ti_start_date.timestamp() +
self.execution_timeout.total_seconds()
+ # Set ``defer.timeout`` so the framework also bounds the
+ # trigger's lifetime (the triggerer enforces this via
+ # ``trigger_timeout``).
+ remaining = execution_deadline - time.time()
+ defer_timeout = datetime.timedelta(seconds=max(0.0, remaining))
Review Comment:
Fixed by clamping `defer_timeout` to a 60-second minimum buffer:
remaining = execution_deadline - time.time()
defer_timeout = max(
datetime.timedelta(seconds=remaining),
datetime.timedelta(seconds=60),
)
Rationale for the 60s buffer (vs the alternatives you suggested):
- **Don't set `timeout` when `remaining <= 0`**: works, but loses the
framework backstop entirely. If the trigger hangs (bug, network partition,
etc.) the task stays deferred forever.
- **Fail immediately when `remaining <= 0`**: cleanest in theory, but
invasive — would need to raise an exception from `invoke_defer_method` and
route through cleanup. Bigger refactor than the bug warrants.
- **60s minimum buffer** (chosen): the trigger's first-iteration deadline
check (top of `run()`) fires within ~`poll_interval` seconds (default 2s) and
emits the operator-handled `status="timeout"` event. The 60s framework backstop
only fires if the trigger is actually hung. Best of both worlds.
Added test
`test_invoke_defer_method_clamps_defer_timeout_to_minimum_buffer_when_deadline_close`
that uses `time_machine.travel(ti_start + 600s)` to put the deadline 300s in
the past and asserts `defer.timeout == timedelta(seconds=60)`.
Open to revisiting if you'd prefer a different minimum (or one of the
alternative approaches).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]