dirrao commented on code in PR #39406:
URL: https://github.com/apache/airflow/pull/39406#discussion_r1590205811


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -599,6 +599,17 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
                 for pod in pod_list:
                     self.adopt_launched_task(kube_client, pod, 
tis_to_flush_by_key)
             self._adopt_completed_pods(kube_client)
+            # as this method can be retried within a short time frame
+            # (wrapped in a run_with_db_retries of scheduler_job_runner,
+            # and get retried due to an OperationalError, for example),
+            # there is a chance that in second attempt, adopt_launched_task 
will not be called even once
+            # as all pods are already adopted in the first attempt.
+            # and tis_to_flush_by_key will contain TIs that are already 
adopted.
+            # therefore, we need to check if the TIs are already adopted by 
the first attempt and remove them.
+            for ti in list(tis_to_flush_by_key.keys()):
+                if ti in self.running:

Review Comment:
   I have seen the leak in executor slots when multiple schedulers are running 
over the time. These leaks might cause some of the tasks left in the running 
queue. It will cause to avoid resetting them forever. 
   
   #36998 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to