paultmathew commented on code in PR #67229:
URL: https://github.com/apache/airflow/pull/67229#discussion_r3277849636


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -2627,6 +2628,96 @@ def test_async_create_pod_should_throw_exception(self, 
mocked_hook, mocked_clean
         log_message = "Trigger emitted an %s event, failing the task: %s"
         mocked_log.error.assert_called_once_with(log_message, status, message)
 
+    @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.BaseHook.get_connection")
+    def 
test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set(
+        self, mocked_get_connection, mocked_convert_config
+    ):
+        """
+        ``execution_timeout`` is translated into an absolute 
``execution_deadline``
+        plumbed through to the trigger. Anchoring on ``ti.start_date`` keeps 
the
+        deadline stable across re-deferrals, which Airflow does not do for
+        ``execution_timeout`` on deferred tasks.
+        """
+        mocked_get_connection.side_effect = 
AirflowNotFoundException("connection not found")
+
+        k = KubernetesPodOperator(
+            task_id=TEST_TASK_ID,
+            namespace=TEST_NAMESPACE,
+            image=TEST_IMAGE,
+            name=TEST_NAME,
+            on_finish_action="keep_pod",
+            in_cluster=True,
+            deferrable=True,
+            execution_timeout=datetime.timedelta(seconds=300),
+        )
+        # Skip the pod-creation path and pretend it's already running.
+        k.pod = MagicMock()
+        k.pod.metadata.name = TEST_NAME
+        k.pod.metadata.namespace = TEST_NAMESPACE
+
+        ti_mock = MagicMock()
+        ti_start = datetime.datetime(2026, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+        ti_mock.start_date = ti_start
+        context = {"ti": ti_mock}
+
+        with patch(
+            f"{TRIGGER_CLASS}.define_pod_container_state",
+            return_value=ContainerState.RUNNING,
+        ):
+            with pytest.raises(TaskDeferred) as exc:
+                k.invoke_defer_method(context=context)
+
+        trigger = exc.value.trigger
+        assert isinstance(trigger, KubernetesPodTrigger)
+        # Deadline = start_date + execution_timeout (300s).
+        expected_deadline = ti_start.timestamp() + 300.0
+        assert trigger.execution_deadline == pytest.approx(expected_deadline, 
abs=0.001)
+        # ``defer.timeout`` is set so the framework also bounds the trigger's
+        # lifetime via ``trigger_timeout`` as a backstop. Should be roughly the
+        # remaining budget; here ti_start is in the past, so this depends on
+        # ``time.time()``. Just assert it's set (non-None).
+        assert exc.value.timeout is not None

Review Comment:
   Switched to `time_machine.travel(ti_start + 30s, tick=False)` and now 
asserts the exact value:
   
       elapsed = datetime.timedelta(seconds=30)
       with time_machine.travel(ti_start + elapsed, tick=False):
           ...
       assert exc.value.timeout == execution_timeout - elapsed   # 300s - 30s = 
270s
   
   `time_machine` was already imported in `test_pod.py:29` (used elsewhere in 
the file). Also added a sibling test 
`test_invoke_defer_method_clamps_defer_timeout_to_minimum_buffer_when_deadline_close`
 that travels to `ti_start + 600s` (300s past the deadline) and asserts the 60s 
minimum-buffer clamp behaviour from comment 2 above. The two tests now cover 
both the "normal remaining" and "past-deadline" paths deterministically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to