This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8e35daec67f729f98a7d407230260a5b8ab1c60f Author: mhenc <mh...@google.com> AuthorDate: Tue May 10 19:13:00 2022 +0200 Implement send_callback method for CeleryKubernetesExecutor and LocalKubernetesExecutor (#23617) (cherry picked from commit c5b72bf30c8b80b6c022055834fc7272a1a44526) --- airflow/executors/celery_kubernetes_executor.py | 12 ++++++++++++ airflow/executors/local_kubernetes_executor.py | 12 ++++++++++++ tests/executors/test_celery_kubernetes_executor.py | 12 ++++++++++++ tests/executors/test_local_kubernetes_executor.py | 12 ++++++++++++ 4 files changed, 48 insertions(+) diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index 5b934160ab..b1edc32235 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -17,6 +17,8 @@ # under the License. from typing import Dict, List, Optional, Set, Union +from airflow.callbacks.base_callback_sink import BaseCallbackSink +from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType from airflow.executors.celery_executor import CeleryExecutor @@ -35,6 +37,7 @@ class CeleryKubernetesExecutor(LoggingMixin): """ supports_ad_hoc_ti_run: bool = True + callback_sink: Optional[BaseCallbackSink] = None KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 'kubernetes_queue') @@ -204,3 +207,12 @@ class CeleryKubernetesExecutor(LoggingMixin): self.celery_executor.debug_dump() self.log.info("Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump() + + def send_callback(self, request: CallbackRequest) -> None: + """Sends callback for execution. + + :param request: Callback request to be executed. + """ + if not self.callback_sink: + raise ValueError("Callback sink is not ready.") + self.callback_sink.send(request) diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py index cb1ddf7c9d..9944cfe1ef 100644 --- a/airflow/executors/local_kubernetes_executor.py +++ b/airflow/executors/local_kubernetes_executor.py @@ -17,6 +17,8 @@ # under the License. from typing import Dict, List, Optional, Set, Union +from airflow.callbacks.base_callback_sink import BaseCallbackSink +from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType from airflow.executors.kubernetes_executor import KubernetesExecutor @@ -35,6 +37,7 @@ class LocalKubernetesExecutor(LoggingMixin): """ supports_ad_hoc_ti_run: bool = True + callback_sink: Optional[BaseCallbackSink] = None KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue') @@ -203,3 +206,12 @@ class LocalKubernetesExecutor(LoggingMixin): self.local_executor.debug_dump() self.log.info("Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump() + + def send_callback(self, request: CallbackRequest) -> None: + """Sends callback for execution. + + :param request: Callback request to be executed. + """ + if not self.callback_sink: + raise ValueError("Callback sink is not ready.") + self.callback_sink.send(request) diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py index 84ca14c5f0..5681476274 100644 --- a/tests/executors/test_celery_kubernetes_executor.py +++ b/tests/executors/test_celery_kubernetes_executor.py @@ -19,6 +19,7 @@ from unittest import mock from parameterized import parameterized +from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.celery_executor import CeleryExecutor from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor @@ -223,3 +224,14 @@ class TestCeleryKubernetesExecutor: assert k8s_executor_mock.kubernetes_queue == conf.get( 'celery_kubernetes_executor', 'kubernetes_queue' ) + + def test_send_callback(self): + cel_exec = CeleryExecutor() + k8s_exec = KubernetesExecutor() + cel_k8s_exec = CeleryKubernetesExecutor(cel_exec, k8s_exec) + cel_k8s_exec.callback_sink = mock.MagicMock() + + callback = CallbackRequest(full_filepath="fake") + cel_k8s_exec.send_callback(callback) + + cel_k8s_exec.callback_sink.send.assert_called_once_with(callback) diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py index 274175f127..48d09ad99e 100644 --- a/tests/executors/test_local_kubernetes_executor.py +++ b/tests/executors/test_local_kubernetes_executor.py @@ -17,6 +17,7 @@ # under the License. from unittest import mock +from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.local_executor import LocalExecutor from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor @@ -67,3 +68,14 @@ class TestLocalKubernetesExecutor: LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue') + + def test_send_callback(self): + local_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) + local_k8s_exec.callback_sink = mock.MagicMock() + + callback = CallbackRequest(full_filepath="fake") + local_k8s_exec.send_callback(callback) + + local_k8s_exec.callback_sink.send.assert_called_once_with(callback)