This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 848830c6e2fcb6a4988afd210c365b5fb0550224
Author: Tanel Kiis <tan...@users.noreply.github.com>
AuthorDate: Thu May 5 13:23:18 2022 +0300

    Use kubernetes queue in kubernetes hybrid executors (#23048)
    
    When using "hybrid" executors (`CeleryKubernetesExecutor` or 
`LocalKubernetesExecutor`),
    then the `clear_not_launched_queued_tasks` mechnism in the 
`KubernetesExecutor` can
    reset the queued tasks, that were given to the other executor.
    
    `KuberneterExecutor` should limit itself to the configured queue when 
working in the
    "hybrid" mode.
    
    (cherry picked from commit ae19eab3b4af98756200843805be882ce02a7d08)
---
 airflow/executors/celery_kubernetes_executor.py    |  1 +
 airflow/executors/kubernetes_executor.py           |  9 +++--
 airflow/executors/local_kubernetes_executor.py     |  1 +
 tests/executors/test_celery_kubernetes_executor.py | 10 ++++++
 tests/executors/test_kubernetes_executor.py        | 38 +++++++++++++++++++++-
 tests/executors/test_local_kubernetes_executor.py  |  7 ++++
 6 files changed, 62 insertions(+), 4 deletions(-)

diff --git a/airflow/executors/celery_kubernetes_executor.py 
b/airflow/executors/celery_kubernetes_executor.py
index 2802b6954e..5b934160ab 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -43,6 +43,7 @@ class CeleryKubernetesExecutor(LoggingMixin):
         self._job_id: Optional[int] = None
         self.celery_executor = celery_executor
         self.kubernetes_executor = kubernetes_executor
+        self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE
 
     @property
     def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 19a04216f2..9b9de71681 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -438,6 +438,7 @@ class KubernetesExecutor(BaseExecutor):
         self.scheduler_job_id: Optional[str] = None
         self.event_scheduler: Optional[EventScheduler] = None
         self.last_handled: Dict[TaskInstanceKey, float] = {}
+        self.kubernetes_queue: Optional[str] = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
     @provide_session
@@ -456,9 +457,11 @@ class KubernetesExecutor(BaseExecutor):
         self.log.debug("Clearing tasks that have not been launched")
         if not self.kube_client:
             raise AirflowException(NOT_STARTED_MESSAGE)
-        queued_tis: List[TaskInstance] = (
-            session.query(TaskInstance).filter(TaskInstance.state == 
State.QUEUED).all()
-        )
+
+        query = session.query(TaskInstance).filter(TaskInstance.state == 
State.QUEUED)
+        if self.kubernetes_queue:
+            query = query.filter(TaskInstance.queue == self.kubernetes_queue)
+        queued_tis: List[TaskInstance] = query.all()
         self.log.info('Found %s queued task instances', len(queued_tis))
 
         # Go through the "last seen" dictionary and clean out old entries
diff --git a/airflow/executors/local_kubernetes_executor.py 
b/airflow/executors/local_kubernetes_executor.py
index befbc325f6..cb1ddf7c9d 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -43,6 +43,7 @@ class LocalKubernetesExecutor(LoggingMixin):
         self._job_id: Optional[str] = None
         self.local_executor = local_executor
         self.kubernetes_executor = kubernetes_executor
+        self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE
 
     @property
     def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
diff --git a/tests/executors/test_celery_kubernetes_executor.py 
b/tests/executors/test_celery_kubernetes_executor.py
index ebcbdba5f6..84ca14c5f0 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -19,6 +19,7 @@ from unittest import mock
 
 from parameterized import parameterized
 
+from airflow.configuration import conf
 from airflow.executors.celery_executor import CeleryExecutor
 from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
 from airflow.executors.kubernetes_executor import KubernetesExecutor
@@ -213,3 +214,12 @@ class TestCeleryKubernetesExecutor:
         job_id = 'this-job-id'
         cel_k8s_exec.job_id = job_id
         assert cel_exec.job_id == k8s_exec.job_id == cel_k8s_exec.job_id == 
job_id
+
+    def test_kubernetes_executor_knows_its_queue(self):
+        celery_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
+
+        assert k8s_executor_mock.kubernetes_queue == conf.get(
+            'celery_kubernetes_executor', 'kubernetes_queue'
+        )
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 541e8453b8..a677fe598b 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -721,7 +721,17 @@ class TestKubernetesExecutor:
             ),
         )
 
-    def test_clear_not_launched_queued_tasks_launched(self, dag_maker, 
create_dummy_dag, session):
+    @pytest.mark.parametrize(
+        'task_queue, kubernetes_queue',
+        [
+            pytest.param('default', None),
+            pytest.param('kubernetes', None),
+            pytest.param('kubernetes', 'kubernetes'),
+        ],
+    )
+    def test_clear_not_launched_queued_tasks_launched(
+        self, dag_maker, create_dummy_dag, session, task_queue, 
kubernetes_queue
+    ):
         """Leave the state alone if a pod already exists"""
         mock_kube_client = mock.MagicMock()
         mock_kube_client.list_namespaced_pod.return_value = 
k8s.V1PodList(items=["something"])
@@ -732,9 +742,11 @@ class TestKubernetesExecutor:
         ti = dag_run.task_instances[0]
         ti.state = State.QUEUED
         ti.queued_by_job_id = 1
+        ti.queue = task_queue
         session.flush()
 
         executor = self.kubernetes_executor
+        executor.kubernetes_queue = kubernetes_queue
         executor.kube_client = mock_kube_client
         executor.clear_not_launched_queued_tasks(session=session)
 
@@ -800,6 +812,30 @@ class TestKubernetesExecutor:
             any_order=True,
         )
 
+    def test_clear_not_launched_queued_tasks_not_launched_other_queue(
+        self, dag_maker, create_dummy_dag, session
+    ):
+        """Queued TI has no pod, but it is not queued for the k8s executor"""
+        mock_kube_client = mock.MagicMock()
+        mock_kube_client.list_namespaced_pod.return_value = 
k8s.V1PodList(items=[])
+
+        create_dummy_dag(dag_id="test_clear", task_id="task1", 
with_dagrun_type=None)
+        dag_run = dag_maker.create_dagrun()
+
+        ti = dag_run.task_instances[0]
+        ti.state = State.QUEUED
+        ti.queued_by_job_id = 1
+        session.flush()
+
+        executor = self.kubernetes_executor
+        executor.kubernetes_queue = 'kubernetes'
+        executor.kube_client = mock_kube_client
+        executor.clear_not_launched_queued_tasks(session=session)
+
+        ti.refresh_from_db()
+        assert ti.state == State.QUEUED
+        assert mock_kube_client.list_namespaced_pod.call_count == 0
+
 
 class TestKubernetesJobWatcher(unittest.TestCase):
     def setUp(self):
diff --git a/tests/executors/test_local_kubernetes_executor.py 
b/tests/executors/test_local_kubernetes_executor.py
index b32e119244..274175f127 100644
--- a/tests/executors/test_local_kubernetes_executor.py
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -60,3 +60,10 @@ class TestLocalKubernetesExecutor:
 
         # Should be equal to Local Executor default parallelism.
         assert local_kubernetes_executor.slots_available == 
conf.getint('core', 'PARALLELISM')
+
+    def test_kubernetes_executor_knows_its_queue(self):
+        local_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
+
+        assert k8s_executor_mock.kubernetes_queue == 
conf.get('local_kubernetes_executor', 'kubernetes_queue')

Reply via email to