MaksYermak commented on code in PR #37223:
URL: https://github.com/apache/airflow/pull/37223#discussion_r1499352165


##########
tests/system/providers/google/cloud/dataprocgdc/example_dataprocgdc.py:
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to create and monitor a Spark Application 
or App Environment
+ in a DPGDC cluster
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.google.cloud.operators.dataprocgdc import (
+    DataprocGdcCreateAppEnvironmentKrmOperator,
+    DataprocGDCSubmitSparkJobKrmOperator,
+)
+from airflow.providers.google.cloud.sensors.dataprocgdc import 
DataprocGDCKrmSensor
+
+DAG_ID = "example_dag_dpgdc-krm"
+
+with DAG(
+    DAG_ID, schedule="@once", catchup=False, start_date=datetime(2024, 2, 5), 
tags=["example", "dataprocgdc"]
+) as dag:
+    submitSparkJobOperator = DataprocGDCSubmitSparkJobKrmOperator(
+        task_id="example-dataprocgdc-submitspark-operator",
+        trigger_rule="all_success",
+        depends_on_past=False,
+        retries=1,
+        application_file="sparkpi.yaml",
+        namespace="default",
+        kubernetes_conn_id="myk8s",

Review Comment:
   Does this operator work with GKE cluster configuration?



##########
airflow/providers/google/cloud/sensors/dataprocgdc.py:
##########
@@ -0,0 +1,127 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Sequence
+
+from kubernetes import client
+
+from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class DataprocGDCKrmSensor(BaseSensorOperator):

Review Comment:
   Is it possible to use this `SparkKubernetesSensor` 
(https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py#L33)
 for this purpose or maybe you can adopt this sensor for GDC cluster?



##########
tests/system/providers/google/cloud/dataprocgdc/example_dataprocgdc.py:
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to create and monitor a Spark Application 
or App Environment
+ in a DPGDC cluster
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.google.cloud.operators.dataprocgdc import (
+    DataprocGdcCreateAppEnvironmentKrmOperator,
+    DataprocGDCSubmitSparkJobKrmOperator,
+)
+from airflow.providers.google.cloud.sensors.dataprocgdc import 
DataprocGDCKrmSensor
+
+DAG_ID = "example_dag_dpgdc-krm"
+
+with DAG(

Review Comment:
   Did you check this DAG on your local env?



##########
airflow/providers/google/cloud/operators/dataprocgdc.py:
##########
@@ -0,0 +1,272 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Dataproc GDC operators using KRM APIs."""
+
+from __future__ import annotations
+
+import datetime
+from functools import cached_property
+from typing import TYPE_CHECKING, Sequence
+
+from kubernetes.client import ApiException, CoreV1Api
+from kubernetes.watch import Watch
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, 
_load_body_to_dict
+
+if TYPE_CHECKING:
+    from kubernetes.client.models import CoreV1EventList
+
+    from airflow.utils.context import Context
+
+
+class DataprocGDCSubmitSparkJobKrmOperator(BaseOperator):
+    """
+    Creates sparkApplication object in dpgdc cluster using KRM APIs.
+
+    :param application_file: Defines gdc 'custom_resource_definition' of
+    'sparkApplication'
+        path to a '.yaml' file, '.json' file, YAML string or python dictionary.
+    :param namespace: kubernetes namespace to put sparkApplication
+    :param kubernetes_conn_id: The :ref:`kubernetes connection id
+    <howto/connection:kubernetes>` for the GDC cluster.
+    :param api_group: DPGDC api group
+    :param api_version: DPGDC api version
+    :param watch: whether to watch the job status and logs or not
+    """
+
+    template_fields: Sequence[str] = "application_file"
+    template_ext: Sequence[str] = (".yaml", ".yml", ".json")
+    ui_color = "#f4a460"
+
+    def __init__(
+        self,
+        *,
+        application_file: str | dict,
+        kubernetes_conn_id: str = "myk8s",
+        config_file: str | None = None,
+        cluster_context: str | None = None,
+        namespace: str = "default",
+        api_group: str = "dataprocgdc.cloud.google.com",
+        api_version: str = "v1alpha1",
+        in_cluster: bool = False,
+        watch: bool = True,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.application_file = application_file
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.config_file = config_file
+        self.cluster_context = cluster_context
+        self.namespace = namespace
+        self.api_group = api_group
+        self.api_version = api_version
+        self.in_cluster = in_cluster
+        self.watch = watch
+        self.plural = "sparkapplications"
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        hook = KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+        return hook
+
+    @cached_property
+    def client(self) -> CoreV1Api:
+        return self.hook.core_v1_client
+
+    def _get_namespace_event_stream(self, namespace, query_kwargs=None):
+        try:
+            return Watch().stream(
+                self.hook.core_v1_client.list_namespaced_event,
+                namespace=namespace,
+                watch=True,
+                **(query_kwargs or {}),
+            )
+        except ApiException as e:
+            if e.status == 410:  # Resource version is too old
+                events: CoreV1EventList = 
self.hook.core_v1_client.list_namespaced_event(
+                    namespace=namespace, watch=False
+                )
+                resource_version = events.metadata.resource_version
+                query_kwargs["resource_version"] = resource_version
+                return self._get_namespace_event_stream(namespace, 
query_kwargs)
+            else:
+                raise
+
+    def execute(self, context: Context):
+        if isinstance(self.application_file, str):
+            body = _load_body_to_dict(self.application_file)
+        else:
+            body = self.application_file
+        name = body["metadata"]["name"]
+        namespace = self.namespace or self.hook.get_namespace()
+
+        response = None
+        is_job_created = False
+        if self.watch:
+            try:
+                namespace_event_stream = self._get_namespace_event_stream(
+                    namespace=namespace,
+                    query_kwargs={
+                        "field_selector": 
f"involvedObject.kind=SparkApplication,involvedObject.name={name}"
+                    },
+                )
+
+                response = self.hook.create_custom_object(
+                    group=self.api_group,
+                    version=self.api_version,
+                    plural=self.plural,
+                    body=body,
+                    namespace=namespace,
+                )
+
+                is_job_created = True
+                for event in namespace_event_stream:
+                    obj = event["object"]
+                    if event["object"].last_timestamp >= 
datetime.datetime.strptime(
+                        response["metadata"]["creationTimestamp"], 
"%Y-%m-%dT%H:%M:%S%z"
+                    ):
+                        self.log.info(obj.message)
+                        if obj.reason == "SparkDriverRunning":
+                            pod_log_stream = Watch().stream(
+                                
self.hook.core_v1_client.read_namespaced_pod_log,
+                                name=f"{name}-driver",
+                                namespace=namespace,
+                                timestamps=True,
+                            )
+                            for line in pod_log_stream:
+                                self.log.info(line)
+                        elif obj.reason in [
+                            "SparkApplicationSubmissionFailed",
+                            "SparkApplicationFailed",
+                            "SparkApplicationDeleted",
+                        ]:
+                            is_job_created = False
+                            raise AirflowException(obj.message)
+                        elif obj.reason == "SparkApplicationCompleted":
+                            break
+                        else:
+                            continue
+            except Exception:
+                if is_job_created:
+                    self.on_kill()
+                raise
+        else:
+            response = self.hook.create_custom_object(
+                group=self.api_group,
+                version=self.api_version,
+                plural=self.plural,
+                body=body,
+                namespace=namespace,
+            )
+
+        return response
+
+    def on_kill(self) -> None:
+        if isinstance(self.application_file, str):
+            body = _load_body_to_dict(self.application_file)
+        else:
+            body = self.application_file
+        name = body["metadata"]["name"]
+        namespace = self.namespace or self.hook.get_namespace()
+        self.hook.delete_custom_object(
+            group=self.api_group, version=self.api_version, 
plural=self.plural, namespace=namespace, name=name
+        )
+
+
+class DataprocGdcCreateAppEnvironmentKrmOperator(BaseOperator):

Review Comment:
   For me these operators `DataprocGDCSubmitSparkJobKrmOperator`  and 
`DataprocGdcCreateAppEnvironmentKrmOperator` look quite similar to 
`KubernetesCreateResourceOperator` 
operator(https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/resource.py#L107).
 Is it possible to use `KubernetesCreateResourceOperator` for your purpose or 
maybe you can be inherited from this operator?
   
   Also, in this PR(https://github.com/apache/airflow/pull/37616) I have added 
ability to create namespaced 'custom_resource_definition' object and create 
this object on GKE cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to