Copilot commented on code in PR #67229:
URL: https://github.com/apache/airflow/pull/67229#discussion_r3276779730


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py:
##########
@@ -306,6 +313,27 @@ async def _wait_for_container_completion(self) -> 
TriggerEvent:
         if self.logging_interval is not None:
             time_get_more_logs = time_begin + 
datetime.timedelta(seconds=self.logging_interval)
         while True:
+            # ``execution_deadline`` is the operator's translation of the
+            # task-level ``execution_timeout`` into an absolute UTC timestamp
+            if self.execution_deadline is not None and time.time() >= 
self.execution_deadline:
+                self.log.info(

Review Comment:
   `execution_deadline` is only checked inside 
`_wait_for_container_completion()`. If a pod never leaves Pending / startup 
(i.e. `_wait_for_pod_start()` blocks), `execution_timeout` still won’t be 
enforced until the startup timeouts fire, which breaks the intended semantics. 
Consider checking `execution_deadline` before/within `_wait_for_pod_start()` 
(or in `run()` before awaiting it) so tasks can time out during pod start as 
well, and add a regression test for the Pending/startup path.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py:
##########
@@ -908,6 +909,21 @@ def invoke_defer_method(
 
         trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
 
+        # Translate ``execution_timeout`` into an absolute deadline plumbed 
into
+        # the trigger.
+        execution_deadline: float | None = None
+        defer_timeout: datetime.timedelta | None = None
+        if self.execution_timeout is not None and context is not None:
+            ti = context.get("ti")
+            ti_start_date = getattr(ti, "start_date", None)
+            if ti_start_date is not None:
+                execution_deadline = ti_start_date.timestamp() + 
self.execution_timeout.total_seconds()
+                # Set ``defer.timeout`` so the framework also bounds the
+                # trigger's lifetime (the triggerer enforces this via
+                # ``trigger_timeout``).
+                remaining = execution_deadline - time.time()
+                defer_timeout = datetime.timedelta(seconds=max(0.0, remaining))

Review Comment:
   `defer_timeout` is computed as `timedelta(seconds=max(0, remaining))`. When 
`remaining <= 0` this becomes `timedelta(0)`, which will set `trigger_timeout` 
to “now” and can cause the scheduler to fail the deferred TI with 
`TRIGGER_TIMEOUT` before the trigger gets a chance to emit the operator-handled 
`timeout` event (bypassing `_clean()`/pod deletion). Consider handling 
`remaining <= 0` explicitly (e.g. don’t set `timeout` in that case and let the 
trigger emit the timeout event, or fail immediately and run cleanup), and 
ensure `timeout` cannot be an already-expired value.
   



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -263,6 +289,100 @@ async def test_run_loop_return_failed_event(self, 
mock_hook, mock_method, mock_w
 
         assert actual_event == expected_event
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` (the operator's translation of
+        ``execution_timeout``) has already passed, the trigger emits a
+        ``timeout`` event immediately on the first iteration of the
+        completion loop instead of polling indefinitely. The operator's
+        existing terminal-event path then fails the task and runs
+        ``on_finish_action`` (pod delete).
+        """
+        # Already-past deadline → first iteration trips it.
+        past_deadline = 1.0
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=past_deadline,
+        )
+        # Force the run to enter ``_wait_for_container_completion`` (the 
deadline
+        # check lives at the top of that loop).
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        # If the deadline check fails to fire we'd fall through to a pod poll;
+        # make that path return RUNNING so we'd hang rather than emit any other
+        # terminal event — verifies the test fails loudly on regression.
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        actual_event = await trigger_with_deadline.run().asend(None)
+
+        assert actual_event.payload["status"] == "timeout"
+        assert actual_event.payload["namespace"] == NAMESPACE
+        assert actual_event.payload["name"] == POD_NAME
+        assert "execution_timeout" in actual_event.payload["message"]
+        assert actual_event.payload["last_log_time"] is None
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` is still in the future, the trigger keeps
+        polling normally — proves the deadline check doesn't fire preemptively
+        on every run.
+        """
+        # Far-future deadline (~year 2286) — guaranteed not reached.
+        future_deadline = 9_999_999_999.0
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=future_deadline,
+        )
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        # Trigger must keep waiting (not emit any event yet).
+        task = asyncio.create_task(trigger_with_deadline.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert not task.done()

Review Comment:
   This test uses a real `asyncio.sleep(0.5)` to assert the trigger keeps 
polling. This adds noticeable wall-clock time to the unit suite and can be 
flaky on slow CI. Consider avoiding real sleeps by using `asyncio.wait_for(..., 
timeout=small)` to assert the first `__anext__()` does not complete 
immediately, or patching `asyncio.sleep`/`poll_interval` to a controllable fast 
path.
   



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -2627,6 +2628,96 @@ def test_async_create_pod_should_throw_exception(self, 
mocked_hook, mocked_clean
         log_message = "Trigger emitted an %s event, failing the task: %s"
         mocked_log.error.assert_called_once_with(log_message, status, message)
 
+    @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+    def 
test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set(
+        self, mocked_get_connection, mocked_convert_config
+    ):
+        """
+        ``execution_timeout`` is translated into an absolute 
``execution_deadline``
+        plumbed through to the trigger. Anchoring on ``ti.start_date`` keeps 
the
+        deadline stable across re-deferrals, which Airflow does not do for
+        ``execution_timeout`` on deferred tasks.
+        """
+        mocked_get_connection.side_effect = 
AirflowNotFoundException("connection not found")
+
+        k = KubernetesPodOperator(
+            task_id=TEST_TASK_ID,
+            namespace=TEST_NAMESPACE,
+            image=TEST_IMAGE,
+            name=TEST_NAME,
+            on_finish_action="keep_pod",
+            in_cluster=True,
+            deferrable=True,
+            execution_timeout=datetime.timedelta(seconds=300),
+        )
+        # Skip the pod-creation path and pretend it's already running.
+        k.pod = MagicMock()
+        k.pod.metadata.name = TEST_NAME
+        k.pod.metadata.namespace = TEST_NAMESPACE
+
+        ti_mock = MagicMock()
+        ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        ti_mock.start_date = ti_start
+        context = {"ti": ti_mock}
+
+        with patch(
+            f"{TRIGGER_CLASS}.define_pod_container_state",
+            return_value=ContainerState.RUNNING,
+        ):
+            with pytest.raises(TaskDeferred) as exc:
+                k.invoke_defer_method(context=context)
+
+        trigger = exc.value.trigger
+        assert isinstance(trigger, KubernetesPodTrigger)
+        # Deadline = start_date + execution_timeout (300s).
+        expected_deadline = ti_start.timestamp() + 300.0
+        assert trigger.execution_deadline == pytest.approx(expected_deadline, 
abs=0.001)
+        # ``defer.timeout`` is set so the framework also bounds the trigger's
+        # lifetime via ``trigger_timeout`` as a backstop. Should be roughly the
+        # remaining budget; here ti_start is in the past, so this depends on
+        # ``time.time()``. Just assert it's set (non-None).
+        assert exc.value.timeout is not None

Review Comment:
   This test only asserts `exc.value.timeout is not None`, and the chosen 
`ti_start` makes the computed `remaining` depend on the current wall clock (and 
can easily produce `timedelta(0)`), so it doesn’t verify that the operator 
passes the *remaining* budget correctly. Consider freezing/patching 
`time.time()` to a deterministic value and asserting `exc.value.timeout` is 
approximately the expected remaining duration (and similarly asserting it stays 
`None` when `execution_timeout` is unset).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to