jykae commented on code in PR #67333:
URL: https://github.com/apache/airflow/pull/67333#discussion_r3308935375


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py:
##########
@@ -1193,6 +1278,279 @@ def test_create_zero_parallelism_fails_validation(
         mock_hook.return_value.create_job.assert_not_called()
         mock_get_pods.assert_not_called()
 
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleans_up_all_pods_on_success(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """Every monitoring pod is deleted after a successful job, including 
under ``parallelism>1``.
+
+        Real ``post_complete_action`` runs; only the K8s SDK boundary is faked.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.return_value.is_job_failed.return_value = False
+        pod_1 = _pod("pod-1")
+        pod_2 = _pod("pod-2")
+        mock_get_pods.return_value = [pod_1, pod_2]
+        mock_find_pod.side_effect = [pod_1, pod_2]
+        mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+            pod_1 if name == "pod-1" else pod_2
+        )
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True, parallelism=2)
+        op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert sorted(pm.deleted) == ["pod-1", "pod-2"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleans_up_pod_on_failure(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """When the job fails, the monitoring pod is still deleted and the 
error propagates."""
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.return_value.is_job_failed.return_value = "Error"
+        pod = _pod("pod-1", phase="Failed")
+        mock_get_pods.return_value = [pod]
+        mock_find_pod.return_value = pod
+        mock_hook.return_value.get_pod.return_value = pod
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True)
+        with pytest.raises(AirflowException, match="is failed with error"):
+            op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert pm.deleted == ["pod-1"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_keep_pod_skips_cleanup(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_pod_manager_prop,
+    ):
+        """``on_finish_action=keep_pod`` leaves the monitoring pod 
untouched."""
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        mock_hook.return_value.is_job_failed.return_value = False
+        pod = _pod("pod-1")
+        mock_get_pods.return_value = [pod]
+        mock_hook.return_value.get_pod.return_value = pod
+
+        op = KubernetesJobOperator(
+            task_id="test_task_id", wait_until_job_complete=True, 
on_finish_action="keep_pod"
+        )
+        op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        assert pm.deleted == []
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleanup_error_on_success_does_not_raise(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """A delete failure on one pod must not stop cleanup of the others nor 
fail the task.
+
+        Exercises the real ``post_complete_action`` against a recording 
pod_manager
+        whose first ``delete_pod`` raises at the K8s SDK boundary.
+        """
+        mock_hook.return_value.is_job_failed.return_value = False
+        pod_1 = _pod("pod-1")
+        pod_2 = _pod("pod-2")
+        mock_get_pods.return_value = [pod_1, pod_2]
+        mock_find_pod.side_effect = [pod_1, pod_2]
+        mock_hook.return_value.get_pod.side_effect = lambda name, namespace: (
+            pod_1 if name == "pod-1" else pod_2
+        )
+        pm = _recording_pod_manager()
+        original_side_effect = pm.delete_pod.side_effect
+
+        def delete_side_effect(pod):
+            if pod.metadata.name == "pod-1":
+                raise ApiException(status=500, reason="boom")
+            original_side_effect(pod)
+
+        pm.delete_pod.side_effect = delete_side_effect
+        mock_pod_manager_prop.return_value = pm
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True, parallelism=2)
+        op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        # pod-1's delete blew up; pod-2 still got deleted.
+        assert pm.deleted == ["pod-2"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.find_pod"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_cleanup_error_does_not_mask_job_failure(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_find_pod,
+        mock_pod_manager_prop,
+    ):
+        """When the job already failed, a cleanup error must not replace the 
job-level error.
+
+        The recording side_effect lets us observe that cleanup was *attempted*
+        even when it raises — the contract is "attempt cleanup, swallow any
+        error, preserve the original job failure".
+        """
+        mock_hook.return_value.is_job_failed.return_value = "Error"
+        pod = _pod("pod-1", phase="Failed")
+        mock_get_pods.return_value = [pod]
+        mock_find_pod.return_value = pod
+        mock_hook.return_value.get_pod.return_value = pod
+        attempted: list[str] = []
+
+        def boom(pod):
+            attempted.append(pod.metadata.name)
+            raise ApiException(status=500, reason="boom")
+
+        pm = mock.MagicMock()
+        pm.delete_pod.side_effect = boom
+        mock_pod_manager_prop.return_value = pm
+
+        op = KubernetesJobOperator(task_id="test_task_id", 
wait_until_job_complete=True)
+        # The job-level error wins; the cleanup ApiException is swallowed.
+        with pytest.raises(AirflowException, match="is failed with error"):
+            op.execute(context={"ti": mock.create_autospec(TaskInstance, 
instance=True)})
+
+        # Cleanup was still attempted even though it raised.
+        assert attempted == ["pod-1"]
+
+    @pytest.mark.non_db_test_override
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.pod_manager"), 
new_callable=mock.PropertyMock)
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.execute_deferrable"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
+    @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    @patch(HOOK_CLASS)
+    def test_execute_does_not_cleanup_when_deferring(
+        self,
+        mock_hook,
+        mock_create_job,
+        mock_build_job_request_obj,
+        mock_get_pods,
+        mock_execute_deferrable,
+        mock_pod_manager_prop,
+    ):
+        """A ``TaskDeferred`` on the way out of ``execute()`` must leave pods 
alive.
+
+        The trigger still needs to watch them; cleanup happens on resume in
+        ``execute_complete``.
+        """
+        pm = _recording_pod_manager()
+        mock_pod_manager_prop.return_value = pm
+        pod = _pod("pod-1")
+        mock_get_pods.return_value = [pod]
+        mock_execute_deferrable.side_effect = TaskDeferred(
+            trigger=mock.MagicMock(), method_name="execute_complete"
+        )

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to