This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 848830c6e2fcb6a4988afd210c365b5fb0550224 Author: Tanel Kiis <tan...@users.noreply.github.com> AuthorDate: Thu May 5 13:23:18 2022 +0300 Use kubernetes queue in kubernetes hybrid executors (#23048) When using "hybrid" executors (`CeleryKubernetesExecutor` or `LocalKubernetesExecutor`), then the `clear_not_launched_queued_tasks` mechnism in the `KubernetesExecutor` can reset the queued tasks, that were given to the other executor. `KuberneterExecutor` should limit itself to the configured queue when working in the "hybrid" mode. (cherry picked from commit ae19eab3b4af98756200843805be882ce02a7d08) --- airflow/executors/celery_kubernetes_executor.py | 1 + airflow/executors/kubernetes_executor.py | 9 +++-- airflow/executors/local_kubernetes_executor.py | 1 + tests/executors/test_celery_kubernetes_executor.py | 10 ++++++ tests/executors/test_kubernetes_executor.py | 38 +++++++++++++++++++++- tests/executors/test_local_kubernetes_executor.py | 7 ++++ 6 files changed, 62 insertions(+), 4 deletions(-) diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index 2802b6954e..5b934160ab 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -43,6 +43,7 @@ class CeleryKubernetesExecutor(LoggingMixin): self._job_id: Optional[int] = None self.celery_executor = celery_executor self.kubernetes_executor = kubernetes_executor + self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE @property def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]: diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 19a04216f2..9b9de71681 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -438,6 +438,7 @@ class KubernetesExecutor(BaseExecutor): self.scheduler_job_id: Optional[str] = None self.event_scheduler: Optional[EventScheduler] = None self.last_handled: Dict[TaskInstanceKey, float] = {} + self.kubernetes_queue: Optional[str] = None super().__init__(parallelism=self.kube_config.parallelism) @provide_session @@ -456,9 +457,11 @@ class KubernetesExecutor(BaseExecutor): self.log.debug("Clearing tasks that have not been launched") if not self.kube_client: raise AirflowException(NOT_STARTED_MESSAGE) - queued_tis: List[TaskInstance] = ( - session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all() - ) + + query = session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED) + if self.kubernetes_queue: + query = query.filter(TaskInstance.queue == self.kubernetes_queue) + queued_tis: List[TaskInstance] = query.all() self.log.info('Found %s queued task instances', len(queued_tis)) # Go through the "last seen" dictionary and clean out old entries diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py index befbc325f6..cb1ddf7c9d 100644 --- a/airflow/executors/local_kubernetes_executor.py +++ b/airflow/executors/local_kubernetes_executor.py @@ -43,6 +43,7 @@ class LocalKubernetesExecutor(LoggingMixin): self._job_id: Optional[str] = None self.local_executor = local_executor self.kubernetes_executor = kubernetes_executor + self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE @property def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]: diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index ebcbdba5f6..84ca14c5f0 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -19,6 +19,7 @@ from unittest import mock from parameterized import parameterized +from airflow.configuration import conf from airflow.executors.celery_executor import CeleryExecutor from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor from airflow.executors.kubernetes_executor import KubernetesExecutor @@ -213,3 +214,12 @@ class TestCeleryKubernetesExecutor: job_id = 'this-job-id' cel_k8s_exec.job_id = job_id assert cel_exec.job_id == k8s_exec.job_id == cel_k8s_exec.job_id == job_id + + def test_kubernetes_executor_knows_its_queue(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + + assert k8s_executor_mock.kubernetes_queue == conf.get( + 'celery_kubernetes_executor', 'kubernetes_queue' + ) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 541e8453b8..a677fe598b 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -721,7 +721,17 @@ class TestKubernetesExecutor: ), ) - def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_dag, session): + @pytest.mark.parametrize( + 'task_queue, kubernetes_queue', + [ + pytest.param('default', None), + pytest.param('kubernetes', None), + pytest.param('kubernetes', 'kubernetes'), + ], + ) + def test_clear_not_launched_queued_tasks_launched( + self, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue + ): """Leave the state alone if a pod already exists""" mock_kube_client = mock.MagicMock() mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=["something"]) @@ -732,9 +742,11 @@ class TestKubernetesExecutor: ti = dag_run.task_instances[0] ti.state = State.QUEUED ti.queued_by_job_id = 1 + ti.queue = task_queue session.flush() executor = self.kubernetes_executor + executor.kubernetes_queue = kubernetes_queue executor.kube_client = mock_kube_client executor.clear_not_launched_queued_tasks(session=session) @@ -800,6 +812,30 @@ class TestKubernetesExecutor: any_order=True, ) + def test_clear_not_launched_queued_tasks_not_launched_other_queue( + self, dag_maker, create_dummy_dag, session + ): + """Queued TI has no pod, but it is not queued for the k8s executor""" + mock_kube_client = mock.MagicMock() + mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[]) + + create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None) + dag_run = dag_maker.create_dagrun() + + ti = dag_run.task_instances[0] + ti.state = State.QUEUED + ti.queued_by_job_id = 1 + session.flush() + + executor = self.kubernetes_executor + executor.kubernetes_queue = 'kubernetes' + executor.kube_client = mock_kube_client + executor.clear_not_launched_queued_tasks(session=session) + + ti.refresh_from_db() + assert ti.state == State.QUEUED + assert mock_kube_client.list_namespaced_pod.call_count == 0 + class TestKubernetesJobWatcher(unittest.TestCase): def setUp(self): diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py index b32e119244..274175f127 100644 --- a/tests/executors/test_local_kubernetes_executor.py +++ b/tests/executors/test_local_kubernetes_executor.py @@ -60,3 +60,10 @@ class TestLocalKubernetesExecutor: # Should be equal to Local Executor default parallelism. assert local_kubernetes_executor.slots_available == conf.getint('core', 'PARALLELISM') + + def test_kubernetes_executor_knows_its_queue(self): + local_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) + + assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue')