paultmathew commented on code in PR #67229:
URL: https://github.com/apache/airflow/pull/67229#discussion_r3277847361


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -263,6 +289,100 @@ async def test_run_loop_return_failed_event(self, 
mock_hook, mock_method, mock_w
 
         assert actual_event == expected_event
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_emits_timeout_event_when_execution_deadline_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` (the operator's translation of
+        ``execution_timeout``) has already passed, the trigger emits a
+        ``timeout`` event immediately on the first iteration of the
+        completion loop instead of polling indefinitely. The operator's
+        existing terminal-event path then fails the task and runs
+        ``on_finish_action`` (pod delete).
+        """
+        # Already-past deadline → first iteration trips it.
+        past_deadline = 1.0
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=past_deadline,
+        )
+        # Force the run to enter ``_wait_for_container_completion`` (the 
deadline
+        # check lives at the top of that loop).
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        # If the deadline check fails to fire we'd fall through to a pod poll;
+        # make that path return RUNNING so we'd hang rather than emit any other
+        # terminal event — verifies the test fails loudly on regression.
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        actual_event = await trigger_with_deadline.run().asend(None)
+
+        assert actual_event.payload["status"] == "timeout"
+        assert actual_event.payload["namespace"] == NAMESPACE
+        assert actual_event.payload["name"] == POD_NAME
+        assert "execution_timeout" in actual_event.payload["message"]
+        assert actual_event.payload["last_log_time"] is None
+
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start")
+    @mock.patch(f"{TRIGGER_PATH}.define_pod_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def 
test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached(
+        self, mock_hook, mock_define_state, mock_wait_pod
+    ):
+        """
+        When ``execution_deadline`` is still in the future, the trigger keeps
+        polling normally — proves the deadline check doesn't fire preemptively
+        on every run.
+        """
+        # Far-future deadline (~year 2286) — guaranteed not reached.
+        future_deadline = 9_999_999_999.0
+        trigger_with_deadline = KubernetesPodTrigger(
+            pod_name=POD_NAME,
+            pod_namespace=NAMESPACE,
+            base_container_name=BASE_CONTAINER_NAME,
+            kubernetes_conn_id=CONN_ID,
+            poll_interval=POLL_INTERVAL,
+            cluster_context=CLUSTER_CONTEXT,
+            config_dict=CONFIG_DICT,
+            in_cluster=IN_CLUSTER,
+            get_logs=GET_LOGS,
+            startup_timeout=STARTUP_TIMEOUT_SECS,
+            startup_check_interval=STARTUP_CHECK_INTERVAL_SECS,
+            schedule_timeout=STARTUP_TIMEOUT_SECS,
+            trigger_start_time=TRIGGER_START_TIME,
+            on_finish_action=ON_FINISH_ACTION,
+            execution_deadline=future_deadline,
+        )
+        mock_wait_pod.return_value = ContainerState.RUNNING
+        mock_define_state.return_value = ContainerState.RUNNING
+        mock_hook.get_pod.return_value = 
self._mock_pod_result(mock.AsyncMock())
+
+        # Trigger must keep waiting (not emit any event yet).
+        task = asyncio.create_task(trigger_with_deadline.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert not task.done()

Review Comment:
   Replaced with:
   
       with pytest.raises(asyncio.TimeoutError):
           await asyncio.wait_for(trigger_with_deadline.run().__anext__(), 
timeout=0.05)
   
   10× less wall-clock (~50ms vs 500ms) and `wait_for` raising `TimeoutError` 
is the explicit "the trigger didn't emit anything" assertion — clearer intent 
than "wait, then check `task.done()`".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to