paultmathew opened a new issue, #67227:
URL: https://github.com/apache/airflow/issues/67227

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernetes` (current main, reproduces against 
[`pod.py` at 
HEAD](https://github.com/apache/airflow/blob/main/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py))
   
   ### Apache Airflow version
   
   main (also reproduces on Airflow 3.2.x)
   
   ### Operating System
   
   Linux (EKS)
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   KubernetesExecutor on EKS, deferrable mode enabled.
   
   ### What happened
   
   When using `KubernetesPodOperator(deferrable=True)` with `execution_timeout` 
set, the Airflow task does not time out and the underlying pod continues to run 
well past the configured `execution_timeout`.
   
   In non-deferrable mode, exceeding `execution_timeout` raises 
`AirflowTaskTimeout`, the task fails, and `on_kill()` deletes the pod. In 
deferrable mode the task transitions to `DEFERRED` immediately after the pod is 
launched, the synchronous `execute()` returns, and the `signal.alarm`-based 
timeout context manager that wraps `execute()` exits cleanly. There is no 
further enforcement of `execution_timeout` for the lifetime of the deferral, so 
the trigger keeps polling the pod indefinitely (bounded only by 
`active_deadline_seconds` on the pod itself, which defaults to ~1h or whatever 
the operator passed in).
   
   This creates inconsistent behaviour between deferrable and non-deferrable 
execution modes.
   
   This is the same class of issue addressed for `DbtCloudRunJobOperator` 
([#61467](https://github.com/apache/airflow/issues/61467) → [PR 
#66449](https://github.com/apache/airflow/pull/66449)) and 
`AirbyteTriggerSyncOperator` 
([#64048](https://github.com/apache/airflow/issues/64048) → [PR 
#64051](https://github.com/apache/airflow/pull/64051)).
   
   ### What you think should happen instead
   
   `execution_timeout` should be enforced consistently regardless of execution 
mode.
   
   When a deferrable `KubernetesPodOperator` exceeds `execution_timeout`:
   
   - the Airflow task should fail due to execution timeout
   - the underlying pod should be deleted (kubelet sends SIGTERM, respecting 
`terminationGracePeriodSeconds`)
   
   This ensures predictable timeout behaviour and prevents long-running or 
orphaned pods.
   
   ### How to reproduce
   
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   
   with DAG(
       dag_id="kpo_deferrable_execution_timeout_repro",
       start_date=datetime(2026, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       KubernetesPodOperator(
           task_id="run_long_pod",
           namespace="default",
           image="alpine:3.20",
           cmds=["sh", "-c"],
           arguments=["sleep 1800"],
           deferrable=True,
           execution_timeout=timedelta(seconds=30),
       )
   ```
   
   1. Trigger the DAG.
   2. Observe that the task transitions to `DEFERRED` within seconds.
   3. Wait past the 30-second `execution_timeout`.
   
   **Observed behaviour**
   
   - The Airflow task remains in `DEFERRED` state.
   - The pod continues running for the full 30 minutes (or until 
`active_deadline_seconds` fires, whichever comes first).
   - The task is never marked as `failed` due to timeout.
   
   **Expected behaviour**
   
   - The Airflow task should be marked `failed` shortly after 30 seconds.
   - The pod should be deleted.
   
   ### Anything else
   
   **Root cause**
   
   `KubernetesPodOperator.execute()` calls `self.defer(trigger=trigger, 
method_name="trigger_reentry")` 
([`pod.py:952`](https://github.com/apache/airflow/blob/main/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py#L952))
 without passing a `timeout=` kwarg. As a result, the resulting `Trigger` row 
has `trigger_timeout=NULL`, the triggerer's `RunTrigger.timeout_after` is 
`None` 
([`triggerer_job_runner.py:786,795`](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L786-L795)),
 and the trigger has no upper bound on its lifetime.
   
   The framework-level `execution_timeout` enforcement is currently a no-op for 
any deferred task — the wrapping `with timeout(...)` block in 
[`task_runner.py:1789`](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1789)
 only covers the synchronous portion of `execute()`, which exits cleanly when 
`TaskDeferred` is raised. There is a literal `# TODO: handle timeout in case of 
deferral` at 
[`task_runner.py:1782`](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L1782)
 acknowledging this gap.
   
   This issue addresses the operator-specific symptom for 
`KubernetesPodOperator`, mirroring the pattern already merged for Airbyte and 
DbtCloud.
   
   **Proposed fix**
   
   Mirror the pattern from PR #64051:
   
   1. Compute an absolute `execution_deadline` from `self.execution_timeout` 
before deferring.
   2. Pass `timeout=self.execution_timeout` to `self.defer(...)` so the Trigger 
has a hard wait deadline.
   3. Pass `execution_deadline` (or equivalent) into `KubernetesPodTrigger` so 
it can emit a timeout event when the deadline is exceeded.
   4. Handle the timeout event in `trigger_reentry` / `execute_complete`, 
deleting the pod (best-effort; cancellation failures should be logged but not 
mask the timeout).
   5. The same logic must apply on re-deferral via `trigger_reentry` when 
`logging_interval` is set — each subsequent `defer()` should pass the 
**remaining** budget, not the full `execution_timeout`.
   
   `KubernetesPodTrigger` already has `safe_to_cancel`/`cleanup` (Airflow ≤ 
3.2) and the new `BaseTrigger.on_kill()` (Airflow 3.3.0+, [PR 
#65590](https://github.com/apache/airflow/pull/65590)) for pod deletion, so the 
pod-cleanup half of the contract is already handled — only the 
`execution_deadline` plumbing is needed.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to