This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7338912  Add task adoption to CeleryKubernetesExecutor (#11244)
7338912 is described below

commit 7338912a78b87be9abcec197bff609b63e396fea
Author: Daniel Imberman <[email protected]>
AuthorDate: Fri Oct 2 11:51:11 2020 -0700

    Add task adoption to CeleryKubernetesExecutor (#11244)
    
    Routes task adoption based on queue name to CeleryExecutor
    or KubernetesExecutor
    
    Co-authored-by: Daniel Imberman <[email protected]>
---
 airflow/executors/celery_kubernetes_executor.py    | 24 +++++++++++++++-
 tests/executors/test_celery_kubernetes_executor.py | 32 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/celery_kubernetes_executor.py 
b/airflow/executors/celery_kubernetes_executor.py
index 353474b..51c1e17 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Dict, Optional, Set, Union
+from typing import Dict, List, Optional, Set, Union
 
 from airflow.configuration import conf
 from airflow.executors.base_executor import CommandType, EventBufferValueType, 
QueuedTaskInstanceType
@@ -131,6 +131,28 @@ class CeleryKubernetesExecutor(LoggingMixin):
 
         return {**cleared_events_from_celery, **cleared_events_from_kubernetes}
 
+    def try_adopt_task_instances(self, tis: List[TaskInstance]) -> 
List[TaskInstance]:
+        """
+        Try to adopt running task instances that have been abandoned by a 
SchedulerJob dying.
+
+        Anything that is not adopted will be cleared by the scheduler (and 
then become eligible for
+        re-scheduling)
+
+        :return: any TaskInstances that were unable to be adopted
+        :rtype: list[airflow.models.TaskInstance]
+        """
+        celery_tis = []
+        kubernetes_tis = []
+        abandoned_tis = []
+        for ti in tis:
+            if ti.queue == self.KUBERNETES_QUEUE:
+                kubernetes_tis.append(ti)
+            else:
+                celery_tis.append(ti)
+        
abandoned_tis.extend(self.celery_executor.try_adopt_task_instances(celery_tis))
+        
abandoned_tis.extend(self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis))
+        return abandoned_tis
+
     def end(self) -> None:
         """
         End celery and kubernetes executor
diff --git a/tests/executors/test_celery_kubernetes_executor.py 
b/tests/executors/test_celery_kubernetes_executor.py
index 31b7336..872b1b7 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -201,6 +201,38 @@ class TestCeleryKubernetesExecutor:
         when_ti_in_k8s_executor()
         when_ti_in_celery_executor()
 
+    def test_adopt_tasks(self):
+        ti = mock.MagicMock
+
+        def when_ti_in_k8s_executor():
+            celery_executor_mock = mock.MagicMock()
+            k8s_executor_mock = mock.MagicMock()
+            ti.queue = "kubernetes"
+            cke = CeleryKubernetesExecutor(celery_executor_mock, 
k8s_executor_mock)
+
+            celery_executor_mock.try_adopt_task_instances.return_value = []
+            k8s_executor_mock.try_adopt_task_instances.return_value = []
+
+            cke.try_adopt_task_instances([ti])
+            
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([])
+            
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
+
+        def when_ti_in_celery_executor():
+            celery_executor_mock = mock.MagicMock()
+            k8s_executor_mock = mock.MagicMock()
+            ti.queue = "default"
+            cke = CeleryKubernetesExecutor(celery_executor_mock, 
k8s_executor_mock)
+
+            celery_executor_mock.try_adopt_task_instances.return_value = []
+            k8s_executor_mock.try_adopt_task_instances.return_value = []
+
+            cke.try_adopt_task_instances([ti])
+            
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
+            
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([])
+
+        when_ti_in_k8s_executor()
+        when_ti_in_celery_executor()
+
     def test_get_event_buffer(self):
         celery_executor_mock = mock.MagicMock()
         k8s_executor_mock = mock.MagicMock()

Reply via email to