ihorlukianov commented on code in PR #68674:
URL: https://github.com/apache/airflow/pull/68674#discussion_r3435122833


##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py:
##########
@@ -1536,6 +1536,44 @@ def 
test_alive_other_scheduler_job_ids_does_not_detach_caller_session(self, sess
             "_alive_other_scheduler_job_ids closed/detached the caller's 
scoped session"
         )
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
+    )
+    def test_sync_processes_completed_pods_once(
+        self, mock_delete_pod, mock_get_kube_client, 
mock_kubernetes_job_watcher
+    ):
+        """Adopted completed pods must not be re-deleted for every 
result-queue item."""
+        executor = self.kubernetes_executor
+        executor.start()
+        try:
+            completed_key = TaskInstanceKey(dag_id="dag", task_id="completed", 
run_id="run_id", try_number=1)
+            queue_key = TaskInstanceKey(dag_id="dag", task_id="queued", 
run_id="run_id", try_number=1)
+            executor.completed = {
+                KubernetesResults(
+                    completed_key,
+                    "completed",
+                    "completed-pod",
+                    "default",
+                    "1",
+                    None,
+                )
+            }
+            executor.result_queue.put(
+                KubernetesResults(queue_key, None, "queue-pod", "default", 
"2", None)
+            )
+            executor.result_queue.put(
+                KubernetesResults(queue_key, None, "queue-pod-2", "default", 
"3", None)
+            )
+
+            executor.sync()
+
+            assert mock_delete_pod.call_count == 3

Review Comment:
   done in 
https://github.com/apache/airflow/pull/68674/commits/a61ec7addd5b8a4b9f741d35c861255c1d79db99



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -813,11 +823,13 @@ def _adopt_completed_pods(self, kube_client: 
client.CoreV1Api) -> None:
                 continue
 
             ti_id = annotations_to_key(pod.metadata.annotations)
+            pod_name = pod.metadata.name
+            self.completed = {result for result in self.completed if 
result.pod_name != pod_name}

Review Comment:
   Done in 
https://github.com/apache/airflow/pull/68674/commits/a61ec7addd5b8a4b9f741d35c861255c1d79db99



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to