This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8e35daec67f729f98a7d407230260a5b8ab1c60f
Author: mhenc <mh...@google.com>
AuthorDate: Tue May 10 19:13:00 2022 +0200

    Implement send_callback method for CeleryKubernetesExecutor and 
LocalKubernetesExecutor (#23617)
    
    (cherry picked from commit c5b72bf30c8b80b6c022055834fc7272a1a44526)
---
 airflow/executors/celery_kubernetes_executor.py    | 12 ++++++++++++
 airflow/executors/local_kubernetes_executor.py     | 12 ++++++++++++
 tests/executors/test_celery_kubernetes_executor.py | 12 ++++++++++++
 tests/executors/test_local_kubernetes_executor.py  | 12 ++++++++++++
 4 files changed, 48 insertions(+)

diff --git a/airflow/executors/celery_kubernetes_executor.py 
b/airflow/executors/celery_kubernetes_executor.py
index 5b934160ab..b1edc32235 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -17,6 +17,8 @@
 # under the License.
 from typing import Dict, List, Optional, Set, Union
 
+from airflow.callbacks.base_callback_sink import BaseCallbackSink
+from airflow.callbacks.callback_requests import CallbackRequest
 from airflow.configuration import conf
 from airflow.executors.base_executor import CommandType, EventBufferValueType, 
QueuedTaskInstanceType
 from airflow.executors.celery_executor import CeleryExecutor
@@ -35,6 +37,7 @@ class CeleryKubernetesExecutor(LoggingMixin):
     """
 
     supports_ad_hoc_ti_run: bool = True
+    callback_sink: Optional[BaseCallbackSink] = None
 
     KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 
'kubernetes_queue')
 
@@ -204,3 +207,12 @@ class CeleryKubernetesExecutor(LoggingMixin):
         self.celery_executor.debug_dump()
         self.log.info("Dumping KubernetesExecutor state")
         self.kubernetes_executor.debug_dump()
+
+    def send_callback(self, request: CallbackRequest) -> None:
+        """Sends callback for execution.
+
+        :param request: Callback request to be executed.
+        """
+        if not self.callback_sink:
+            raise ValueError("Callback sink is not ready.")
+        self.callback_sink.send(request)
diff --git a/airflow/executors/local_kubernetes_executor.py 
b/airflow/executors/local_kubernetes_executor.py
index cb1ddf7c9d..9944cfe1ef 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -17,6 +17,8 @@
 # under the License.
 from typing import Dict, List, Optional, Set, Union
 
+from airflow.callbacks.base_callback_sink import BaseCallbackSink
+from airflow.callbacks.callback_requests import CallbackRequest
 from airflow.configuration import conf
 from airflow.executors.base_executor import CommandType, EventBufferValueType, 
QueuedTaskInstanceType
 from airflow.executors.kubernetes_executor import KubernetesExecutor
@@ -35,6 +37,7 @@ class LocalKubernetesExecutor(LoggingMixin):
     """
 
     supports_ad_hoc_ti_run: bool = True
+    callback_sink: Optional[BaseCallbackSink] = None
 
     KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 
'kubernetes_queue')
 
@@ -203,3 +206,12 @@ class LocalKubernetesExecutor(LoggingMixin):
         self.local_executor.debug_dump()
         self.log.info("Dumping KubernetesExecutor state")
         self.kubernetes_executor.debug_dump()
+
+    def send_callback(self, request: CallbackRequest) -> None:
+        """Sends callback for execution.
+
+        :param request: Callback request to be executed.
+        """
+        if not self.callback_sink:
+            raise ValueError("Callback sink is not ready.")
+        self.callback_sink.send(request)
diff --git a/tests/executors/test_celery_kubernetes_executor.py 
b/tests/executors/test_celery_kubernetes_executor.py
index 84ca14c5f0..5681476274 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -19,6 +19,7 @@ from unittest import mock
 
 from parameterized import parameterized
 
+from airflow.callbacks.callback_requests import CallbackRequest
 from airflow.configuration import conf
 from airflow.executors.celery_executor import CeleryExecutor
 from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
@@ -223,3 +224,14 @@ class TestCeleryKubernetesExecutor:
         assert k8s_executor_mock.kubernetes_queue == conf.get(
             'celery_kubernetes_executor', 'kubernetes_queue'
         )
+
+    def test_send_callback(self):
+        cel_exec = CeleryExecutor()
+        k8s_exec = KubernetesExecutor()
+        cel_k8s_exec = CeleryKubernetesExecutor(cel_exec, k8s_exec)
+        cel_k8s_exec.callback_sink = mock.MagicMock()
+
+        callback = CallbackRequest(full_filepath="fake")
+        cel_k8s_exec.send_callback(callback)
+
+        cel_k8s_exec.callback_sink.send.assert_called_once_with(callback)
diff --git a/tests/executors/test_local_kubernetes_executor.py 
b/tests/executors/test_local_kubernetes_executor.py
index 274175f127..48d09ad99e 100644
--- a/tests/executors/test_local_kubernetes_executor.py
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -17,6 +17,7 @@
 # under the License.
 from unittest import mock
 
+from airflow.callbacks.callback_requests import CallbackRequest
 from airflow.configuration import conf
 from airflow.executors.local_executor import LocalExecutor
 from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor
@@ -67,3 +68,14 @@ class TestLocalKubernetesExecutor:
         LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
 
         assert k8s_executor_mock.kubernetes_queue == 
conf.get('local_kubernetes_executor', 'kubernetes_queue')
+
+    def test_send_callback(self):
+        local_executor_mock = mock.MagicMock()
+        k8s_executor_mock = mock.MagicMock()
+        local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, 
k8s_executor_mock)
+        local_k8s_exec.callback_sink = mock.MagicMock()
+
+        callback = CallbackRequest(full_filepath="fake")
+        local_k8s_exec.send_callback(callback)
+
+        local_k8s_exec.callback_sink.send.assert_called_once_with(callback)

Reply via email to