This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7338912 Add task adoption to CeleryKubernetesExecutor (#11244)
7338912 is described below
commit 7338912a78b87be9abcec197bff609b63e396fea
Author: Daniel Imberman <[email protected]>
AuthorDate: Fri Oct 2 11:51:11 2020 -0700
Add task adoption to CeleryKubernetesExecutor (#11244)
Routes task adoption based on queue name to CeleryExecutor
or KubernetesExecutor
Co-authored-by: Daniel Imberman <[email protected]>
---
airflow/executors/celery_kubernetes_executor.py | 24 +++++++++++++++-
tests/executors/test_celery_kubernetes_executor.py | 32 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/airflow/executors/celery_kubernetes_executor.py
b/airflow/executors/celery_kubernetes_executor.py
index 353474b..51c1e17 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import Dict, Optional, Set, Union
+from typing import Dict, List, Optional, Set, Union
from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType,
QueuedTaskInstanceType
@@ -131,6 +131,28 @@ class CeleryKubernetesExecutor(LoggingMixin):
return {**cleared_events_from_celery, **cleared_events_from_kubernetes}
+ def try_adopt_task_instances(self, tis: List[TaskInstance]) ->
List[TaskInstance]:
+ """
+ Try to adopt running task instances that have been abandoned by a
SchedulerJob dying.
+
+ Anything that is not adopted will be cleared by the scheduler (and
then become eligible for
+ re-scheduling)
+
+ :return: any TaskInstances that were unable to be adopted
+ :rtype: list[airflow.models.TaskInstance]
+ """
+ celery_tis = []
+ kubernetes_tis = []
+ abandoned_tis = []
+ for ti in tis:
+ if ti.queue == self.KUBERNETES_QUEUE:
+ kubernetes_tis.append(ti)
+ else:
+ celery_tis.append(ti)
+
abandoned_tis.extend(self.celery_executor.try_adopt_task_instances(celery_tis))
+
abandoned_tis.extend(self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis))
+ return abandoned_tis
+
def end(self) -> None:
"""
End celery and kubernetes executor
diff --git a/tests/executors/test_celery_kubernetes_executor.py
b/tests/executors/test_celery_kubernetes_executor.py
index 31b7336..872b1b7 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -201,6 +201,38 @@ class TestCeleryKubernetesExecutor:
when_ti_in_k8s_executor()
when_ti_in_celery_executor()
+ def test_adopt_tasks(self):
+ ti = mock.MagicMock
+
+ def when_ti_in_k8s_executor():
+ celery_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
+ ti.queue = "kubernetes"
+ cke = CeleryKubernetesExecutor(celery_executor_mock,
k8s_executor_mock)
+
+ celery_executor_mock.try_adopt_task_instances.return_value = []
+ k8s_executor_mock.try_adopt_task_instances.return_value = []
+
+ cke.try_adopt_task_instances([ti])
+
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([])
+
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
+
+ def when_ti_in_celery_executor():
+ celery_executor_mock = mock.MagicMock()
+ k8s_executor_mock = mock.MagicMock()
+ ti.queue = "default"
+ cke = CeleryKubernetesExecutor(celery_executor_mock,
k8s_executor_mock)
+
+ celery_executor_mock.try_adopt_task_instances.return_value = []
+ k8s_executor_mock.try_adopt_task_instances.return_value = []
+
+ cke.try_adopt_task_instances([ti])
+
celery_executor_mock.try_adopt_task_instances.assert_called_once_with([ti])
+
k8s_executor_mock.try_adopt_task_instances.assert_called_once_with([])
+
+ when_ti_in_k8s_executor()
+ when_ti_in_celery_executor()
+
def test_get_event_buffer(self):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()