This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8daa53eaa5 Avoid resetting adopted task instances when retrying for
kubernetes executor (#39406)
8daa53eaa5 is described below
commit 8daa53eaa5d64727abd7430c9f58eb8a14613db2
Author: Vu Tan <[email protected]>
AuthorDate: Fri Jun 7 17:00:24 2024 +0900
Avoid resetting adopted task instances when retrying for kubernetes
executor (#39406)
* Avoid resetting adopted task instances when retrying
* Stop using f-string when logging
* Address comment
* Remove return type of generator func
* Add unit test
* Add comment and fix linter error
---
.../kubernetes/executors/kubernetes_executor.py | 18 +++++++++++++--
.../executors/test_kubernetes_executor.py | 27 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index d2f9c61ce8..40a27d70fd 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -83,7 +83,6 @@ if TYPE_CHECKING:
AirflowKubernetesScheduler,
)
-
# CLI Args
ARG_NAMESPACE = Arg(
("--namespace",),
@@ -577,7 +576,22 @@ class KubernetesExecutor(BaseExecutor):
for pod in pod_list:
self.adopt_launched_task(kube_client, pod,
tis_to_flush_by_key)
self._adopt_completed_pods(kube_client)
- tis_to_flush.extend(tis_to_flush_by_key.values())
+
+ # as this method can be retried within a short time frame
+ # (wrapped in a run_with_db_retries of scheduler_job_runner,
+ # and get retried due to an OperationalError, for example),
+ # there is a chance that in second attempt, adopt_launched_task
will not be called even once
+ # as all pods are already adopted in the first attempt.
+ # and tis_to_flush_by_key will contain TIs that are already
adopted.
+ # therefore, we need to check if the TIs are already adopted by
the first attempt and remove them.
+ def _iter_tis_to_flush():
+ for key, ti in tis_to_flush_by_key.items():
+ if key in self.running:
+ self.log.info("%s is already adopted, no need to
flush.", ti)
+ else:
+ yield ti
+
+ tis_to_flush.extend(_iter_tis_to_flush())
return tis_to_flush
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 9dffa3f778..f3831f3195 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -991,8 +991,35 @@ class TestKubernetesExecutor:
tis_to_flush = executor.try_adopt_task_instances([mock_ti])
assert tis_to_flush == [mock_ti]
+ assert executor.running == set()
+ mock_adopt_launched_task.assert_not_called()
+ mock_adopt_completed_pods.assert_called_once()
+
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+ @mock.patch(
+
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
+ )
+ @mock.patch(
+
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
+ )
+ def test_try_adopt_already_adopted_task_instances(
+ self, mock_adopt_completed_pods, mock_adopt_launched_task,
mock_kube_dynamic_client
+ ):
+ """For TIs that are already adopted, we should not flush them"""
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.get.return_value.items = []
+ mock_kube_client = mock.MagicMock()
+ executor = self.kubernetes_executor
+ executor.kube_client = mock_kube_client
+ ti_key = TaskInstanceKey("dag", "task", "run_id", 1)
+ mock_ti = mock.MagicMock(queued_by_job_id="1",
external_executor_id="1", key=ti_key)
+ executor.running = {ti_key}
+
+ tis_to_flush = executor.try_adopt_task_instances([mock_ti])
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()
+ assert tis_to_flush == []
+ assert executor.running == {ti_key}
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_launched_task(self, mock_kube_client):