This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d568543d879d4a56c675dd1d510e14143f201c5e
Author: raphaelauv <raphael...@users.noreply.github.com>
AuthorDate: Wed Nov 22 11:23:46 2023 +0100

    feat: K8S resource operator - CRD (#35600)
    
    * feat: K8S resource operator - CRD
    
    * clean
    
    * tests
    
    * remove sensor ( for another PR )
    
    * clean
    
    * test on k8s_resource_iterator
---
 .../cncf/kubernetes/operators/resource.py          | 62 +++++++++++++++++----
 .../cncf/kubernetes/utils/k8s_resource_iterator.py | 46 +++++++++++++++
 .../cncf/kubernetes/operators/test_resource.py     | 63 +++++++++++++++++++++
 .../kubernetes/utils/test_k8s_resource_iterator.py | 65 ++++++++++++++++++++++
 4 files changed, 225 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py 
b/airflow/providers/cncf/kubernetes/operators/resource.py
index 598731b639..569b5861a6 100644
--- a/airflow/providers/cncf/kubernetes/operators/resource.py
+++ b/airflow/providers/cncf/kubernetes/operators/resource.py
@@ -27,9 +27,10 @@ from kubernetes.utils import create_from_yaml
 from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 from airflow.providers.cncf.kubernetes.utils.delete_from import 
delete_from_yaml
+from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import 
k8s_resource_iterator
 
 if TYPE_CHECKING:
-    from kubernetes.client import ApiClient
+    from kubernetes.client import ApiClient, CustomObjectsApi
 
 __all__ = ["KubernetesCreateResourceOperator", 
"KubernetesDeleteResourceOperator"]
 
@@ -56,17 +57,23 @@ class KubernetesResourceBaseOperator(BaseOperator):
         yaml_conf: str,
         namespace: str | None = None,
         kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
+        custom_resource_definition: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self._namespace = namespace
         self.kubernetes_conn_id = kubernetes_conn_id
         self.yaml_conf = yaml_conf
+        self.custom_resource_definition = custom_resource_definition
 
     @cached_property
     def client(self) -> ApiClient:
         return self.hook.api_client
 
+    @cached_property
+    def custom_object_client(self) -> CustomObjectsApi:
+        return self.hook.custom_object_client
+
     @cached_property
     def hook(self) -> KubernetesHook:
         hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
@@ -78,24 +85,57 @@ class KubernetesResourceBaseOperator(BaseOperator):
         else:
             return self.hook.get_namespace() or "default"
 
+    def get_crd_fields(self, body: dict) -> tuple[str, str, str, str]:
+        api_version = body["apiVersion"]
+        group = api_version[0 : api_version.find("/")]
+        version = api_version[api_version.find("/") + 1 :]
+
+        namespace = None
+        if body.get("metadata"):
+            metadata: dict = body.get("metadata", None)
+            namespace = metadata.get("namespace", None)
+        if namespace is None:
+            namespace = self.get_namespace()
+
+        plural = body["kind"].lower() + "s"
+
+        return group, version, namespace, plural
+
 
 class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
     """Create a resource in a kubernetes."""
 
+    def create_custom_from_yaml_object(self, body: dict):
+        group, version, namespace, plural = self.get_crd_fields(body)
+        self.custom_object_client.create_namespaced_custom_object(group, 
version, namespace, plural, body)
+
     def execute(self, context) -> None:
-        create_from_yaml(
-            k8s_client=self.client,
-            yaml_objects=yaml.safe_load_all(self.yaml_conf),
-            namespace=self.get_namespace(),
-        )
+        resources = yaml.safe_load_all(self.yaml_conf)
+        if not self.custom_resource_definition:
+            create_from_yaml(
+                k8s_client=self.client,
+                yaml_objects=resources,
+                namespace=self.get_namespace(),
+            )
+        else:
+            k8s_resource_iterator(self.create_custom_from_yaml_object, 
resources)
 
 
 class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
     """Delete a resource in a kubernetes."""
 
+    def delete_custom_from_yaml_object(self, body: dict):
+        name = body["metadata"]["name"]
+        group, version, namespace, plural = self.get_crd_fields(body)
+        self.custom_object_client.delete_namespaced_custom_object(group, 
version, namespace, plural, name)
+
     def execute(self, context) -> None:
-        delete_from_yaml(
-            k8s_client=self.client,
-            yaml_objects=yaml.safe_load_all(self.yaml_conf),
-            namespace=self.get_namespace(),
-        )
+        resources = yaml.safe_load_all(self.yaml_conf)
+        if not self.custom_resource_definition:
+            delete_from_yaml(
+                k8s_client=self.client,
+                yaml_objects=resources,
+                namespace=self.get_namespace(),
+            )
+        else:
+            k8s_resource_iterator(self.delete_custom_from_yaml_object, 
resources)
diff --git a/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py 
b/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py
new file mode 100644
index 0000000000..bfa1d05272
--- /dev/null
+++ b/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Callable, Iterator
+
+from kubernetes.utils import FailToCreateError
+
+from airflow.providers.cncf.kubernetes.utils.delete_from import 
FailToDeleteError
+
+
+def k8s_resource_iterator(callback: Callable[[dict], None], resources: 
Iterator) -> None:
+    failures: list = []
+    for data in resources:
+        if data is not None:
+            if "List" in data["kind"]:
+                kind = data["kind"].replace("List", "")
+                for yml_doc in data["items"]:
+                    if kind != "":
+                        yml_doc["apiVersion"] = data["apiVersion"]
+                        yml_doc["kind"] = kind
+                    try:
+                        callback(yml_doc)
+                    except (FailToCreateError, FailToDeleteError) as failure:
+                        failures.extend(failure.api_exceptions)
+            else:
+                try:
+                    callback(data)
+                except (FailToCreateError, FailToDeleteError) as failure:
+                    failures.extend(failure.api_exceptions)
+    if failures:
+        raise FailToCreateError(failures)
diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py 
b/tests/providers/cncf/kubernetes/operators/test_resource.py
index a565e84fc1..9673c6e082 100644
--- a/tests/providers/cncf/kubernetes/operators/test_resource.py
+++ b/tests/providers/cncf/kubernetes/operators/test_resource.py
@@ -56,6 +56,16 @@ items:
     name: test_pvc_2
 """
 
+TEST_VALID_CRD_YAML = """
+apiVersion: ray.io/v1
+kind: RayJob
+metadata:
+  name: rayjob-sample
+spec:
+  entrypoint: python /home/ray/program/job.py
+  shutdownAfterJobFinishes: true
+"""
+
 HOOK_CLASS = 
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook"
 
 
@@ -89,6 +99,19 @@ class TestKubernetesXResourceOperator:
             body=yaml.safe_load(TEST_VALID_RESOURCE_YAML), namespace="default"
         )
 
+    
@patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim")
+    def test_create_application_from_yaml_list(self, 
mock_create_namespaced_persistent_volume_claim, context):
+        op = KubernetesCreateResourceOperator(
+            yaml_conf=TEST_VALID_LIST_RESOURCE_YAML,
+            dag=self.dag,
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_task_id",
+        )
+
+        op.execute(context)
+
+        assert mock_create_namespaced_persistent_volume_claim.call_count == 2
+
     
@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
     def test_single_delete_application_from_yaml(
         self, mock_delete_namespaced_persistent_volume_claim, context
@@ -118,3 +141,43 @@ class TestKubernetesXResourceOperator:
         op.execute(context)
 
         mock_delete_namespaced_persistent_volume_claim.assert_called()
+
+    
@patch("kubernetes.client.api.CustomObjectsApi.create_namespaced_custom_object")
+    def test_create_custom_application_from_yaml(self, 
mock_create_namespaced_custom_object, context):
+        op = KubernetesCreateResourceOperator(
+            yaml_conf=TEST_VALID_CRD_YAML,
+            dag=self.dag,
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_task_id",
+            custom_resource_definition=True,
+        )
+
+        op.execute(context)
+
+        mock_create_namespaced_custom_object.assert_called_once_with(
+            "ray.io",
+            "v1",
+            "default",
+            "rayjobs",
+            yaml.safe_load(TEST_VALID_CRD_YAML),
+        )
+
+    
@patch("kubernetes.client.api.CustomObjectsApi.delete_namespaced_custom_object")
+    def test_delete_custom_application_from_yaml(self, 
mock_delete_namespaced_custom_object, context):
+        op = KubernetesDeleteResourceOperator(
+            yaml_conf=TEST_VALID_CRD_YAML,
+            dag=self.dag,
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_task_id",
+            custom_resource_definition=True,
+        )
+
+        op.execute(context)
+
+        mock_delete_namespaced_custom_object.assert_called_once_with(
+            "ray.io",
+            "v1",
+            "default",
+            "rayjobs",
+            "rayjob-sample",
+        )
diff --git 
a/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py 
b/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py
new file mode 100644
index 0000000000..8a2e4f0539
--- /dev/null
+++ b/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections import namedtuple
+
+import pytest
+import yaml
+from kubernetes.utils import FailToCreateError
+
+from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import 
k8s_resource_iterator
+
+TEST_VALID_LIST_RESOURCE_YAML = """
+apiVersion: v1
+kind: List
+items:
+- apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: test_pvc_1
+- apiVersion: v1
+  kind: PersistentVolumeClaim
+  metadata:
+    name: test_pvc_2
+"""
+
+
+def test_k8s_resource_iterator():
+    exception_k8s = namedtuple("Exception_k8s", "reason body")
+
+    def test_callback_failing(yml_doc: dict) -> None:
+        raise FailToCreateError(exception_k8s("the_reason", "the_body "))
+
+    with pytest.raises(FailToCreateError) as exc_info:
+        k8s_resource_iterator(
+            test_callback_failing, 
resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML)
+        )
+
+        assert (
+            str(exc_info.value)
+            == "Error from server (the_reason): the_body Error from server 
(the_reason): the_body "
+        )
+
+    def callback_success(yml_doc: dict) -> None:
+        return
+
+    try:
+        k8s_resource_iterator(callback_success, 
resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML))
+
+    except FailToCreateError:
+        assert False

Reply via email to